"""配置读取客户端""" import os from typing import Optional, Any, Dict from functools import lru_cache from .http_client import PlatformHttpClient class ConfigReader: """配置读取器 从平台服务读取租户配置 使用示例: config = ConfigReader(tenant_id=1) # 读取单个配置 value = await config.get("wechat", "app_id") # 读取配置组 wechat_config = await config.get_group("wechat") """ def __init__( self, tenant_id: int, platform_url: Optional[str] = None, api_key: Optional[str] = None, cache_ttl: int = 300 # 缓存时间(秒) ): self.tenant_id = tenant_id self.platform_url = platform_url or os.getenv("PLATFORM_URL", "") self.api_key = api_key or os.getenv("PLATFORM_API_KEY", "") self.cache_ttl = cache_ttl self._client = PlatformHttpClient( base_url=self.platform_url, api_key=self.api_key ) # 本地缓存 self._cache: Dict[str, Any] = {} async def get( self, config_type: str, config_key: str, default: Any = None ) -> Any: """读取配置值 Args: config_type: 配置类型 config_key: 配置键 default: 默认值 Returns: 配置值 """ cache_key = f"{config_type}:{config_key}" # 检查缓存 if cache_key in self._cache: return self._cache[cache_key] # 从平台获取 try: response = await self._client.get( f"/api/config/{config_type}/{config_key}", params={"tenant_id": self.tenant_id} ) if response.status_code == 200: data = response.json() value = data.get("config_value", default) self._cache[cache_key] = value return value elif response.status_code == 404: return default else: raise Exception(f"Config read failed: {response.status_code}") except Exception as e: # 失败时返回默认值 return default def get_env( self, env_key: str, default: Any = None ) -> Any: """从环境变量读取配置(同步方法) Args: env_key: 环境变量名 default: 默认值 Returns: 配置值 """ return os.getenv(env_key, default) def clear_cache(self): """清除缓存""" self._cache.clear()