""" 统一配置加载。从 core/config/pipeline_config.yaml 读取基础配置, 从 core/config/.env 或环境变量覆盖敏感凭证。 所有路径均基于 core/ 目录内部,不依赖外部 models/。 """ import os from pathlib import Path from typing import Any, Dict, List, Optional import yaml # core/ 目录(本文件所在目录) CORE_ROOT = Path(__file__).resolve().parent def _resolve_env_vars(value: Any) -> Any: """递归解析 ${VAR} 格式的环境变量引用。""" if isinstance(value, str) and value.startswith("${") and value.endswith("}"): env_key = value[2:-1] if ":-" in env_key: env_key, default = env_key.split(":-", 1) return os.environ.get(env_key, default) return os.environ.get(env_key, value) if isinstance(value, dict): return {k: _resolve_env_vars(v) for k, v in value.items()} if isinstance(value, list): return [_resolve_env_vars(v) for v in value] return value def _load_dotenv(dotenv_path: Optional[Path] = None) -> None: if dotenv_path is None: dotenv_path = CORE_ROOT / "config" / ".env" if not dotenv_path.exists(): return with open(dotenv_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue key, _, val = line.partition("=") key = key.strip() val = val.strip().strip("'\"") if key not in os.environ: os.environ[key] = val class PipelineConfig: """管线统一配置,所有路径基于 core/ 目录。""" def __init__(self, config_path: Optional[str] = None, plant: str = "xishan"): _load_dotenv() if config_path is None: config_path = str(CORE_ROOT / "config" / "pipeline_config.yaml") with open(config_path, "r", encoding="utf-8") as f: raw = yaml.safe_load(f) self._data: Dict[str, Any] = _resolve_env_vars(raw) self.plant = plant # ---- API ---- @property def api_base_url(self) -> str: return self._data["api"]["base_url"] @property def project_id(self) -> int: return int(self._data["api"].get("project_id", 92)) # ---- Login ---- @property def login_user(self) -> str: return self._data["api"].get("login_user", "admin") @property def login_password(self) -> str: return self._data["api"].get("login_password", "") @property def login_dep_id(self) -> str: return str(self._data["api"].get("login_dep_id", "135")) # ---- Database ---- @property def db_config(self) -> Dict[str, Any]: return dict(self._data.get("database", {})) # ---- SCADA ---- @property def scada_secret(self) -> str: return self._data["scada"]["secret"] # ---- Dry Run ---- @property def dry_run(self) -> bool: return self._data.get("dry_run", True) # ---- UF ---- @property def uf_model_filename(self) -> str: return self._data["uf"].get("model_filename", "48h_dqn_model.zip") @property def uf_is_times(self) -> bool: return self._data["uf"].get("is_times", False) @property def uf_trigger_value(self) -> int: return int(self._data["uf"].get("trigger_value", 95)) @property def uf_poll_interval(self) -> int: return int(self._data["uf"].get("poll_interval", 2)) @property def uf_units(self) -> List[str]: units_map = self._data["uf"].get("units", {}) if isinstance(units_map, dict): return units_map.get(self.plant, ["UF1"]) return units_map # ---- Diagnosis ---- @property def diagnosis_interval_minutes(self) -> int: return int(self._data["diagnosis"].get("interval_minutes", 40)) # ---- RO ---- @property def ro_tmp_limit(self) -> float: return float(self._data["ro"].get("tmp_limit", 0.21)) @property def ro_dpt_type(self) -> str: return self._data["ro"].get("dpt_type", "DPT_1") @property def ro_units(self) -> List[str]: units_map = self._data["ro"].get("units", {}) if isinstance(units_map, dict): return units_map.get(self.plant, ["RO1", "RO2"]) return units_map @property def ro_schedule(self) -> str: return self._data["ro"].get("schedule", "daily") @property def ro_history_paths(self) -> Dict[str, str]: return self._data["ro"].get("history_paths", {}) # ---- Paths (全部基于 core/models/) ---- @property def uf_root(self) -> Path: return CORE_ROOT / "models" / "uf-rl" @property def uf_model_path(self) -> Path: return self.uf_root / self.plant / self.uf_model_filename @property def uf_env_config_path(self) -> Path: return self.uf_root / self.plant / "env_config.yaml" @property def diagnosis_root(self) -> Path: return CORE_ROOT / "models" / "dynamic_anomaly_diagnosis" @property def ro_root(self) -> Path: return CORE_ROOT / "models" / "ro_mechanism_predict" # ---- General ---- def get(self, key_path: str, default: Any = None) -> Any: keys = key_path.split(".") value = self._data for k in keys: if isinstance(value, dict) and k in value: value = value[k] else: return default return value