| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- """
- 统一配置加载。从 core/config/pipeline_config.yaml 读取基础配置,
- 从 core/config/.env 或环境变量覆盖敏感凭证。
- 所有路径均基于 core/ 目录内部,不依赖外部 models/。
- """
- import os
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- import yaml
- # core/ 目录(本文件所在目录)
- CORE_ROOT = Path(__file__).resolve().parent
- def _resolve_env_vars(value: Any) -> Any:
- """递归解析 ${VAR} 格式的环境变量引用。"""
- if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
- env_key = value[2:-1]
- if ":-" in env_key:
- env_key, default = env_key.split(":-", 1)
- return os.environ.get(env_key, default)
- return os.environ.get(env_key, value)
- if isinstance(value, dict):
- return {k: _resolve_env_vars(v) for k, v in value.items()}
- if isinstance(value, list):
- return [_resolve_env_vars(v) for v in value]
- return value
- def _load_dotenv(dotenv_path: Optional[Path] = None) -> None:
- if dotenv_path is None:
- dotenv_path = CORE_ROOT / "config" / ".env"
- if not dotenv_path.exists():
- return
- with open(dotenv_path, "r", encoding="utf-8") as f:
- for line in f:
- line = line.strip()
- if not line or line.startswith("#"):
- continue
- if "=" not in line:
- continue
- key, _, val = line.partition("=")
- key = key.strip()
- val = val.strip().strip("'\"")
- if key not in os.environ:
- os.environ[key] = val
- class PipelineConfig:
- """管线统一配置,所有路径基于 core/ 目录。"""
- def __init__(self, config_path: Optional[str] = None, plant: str = "xishan"):
- _load_dotenv()
- if config_path is None:
- config_path = str(CORE_ROOT / "config" / "pipeline_config.yaml")
- with open(config_path, "r", encoding="utf-8") as f:
- raw = yaml.safe_load(f)
- self._data: Dict[str, Any] = _resolve_env_vars(raw)
- self.plant = plant
- # ---- API ----
- @property
- def api_base_url(self) -> str:
- return self._data["api"]["base_url"]
- @property
- def project_id(self) -> int:
- return int(self._data["api"].get("project_id", 92))
- # ---- Login ----
- @property
- def login_user(self) -> str:
- return self._data["api"].get("login_user", "admin")
- @property
- def login_password(self) -> str:
- return self._data["api"].get("login_password", "")
- @property
- def login_dep_id(self) -> str:
- return str(self._data["api"].get("login_dep_id", "135"))
- # ---- Database ----
- @property
- def db_config(self) -> Dict[str, Any]:
- return dict(self._data.get("database", {}))
- # ---- SCADA ----
- @property
- def scada_secret(self) -> str:
- return self._data["scada"]["secret"]
- # ---- Dry Run ----
- @property
- def dry_run(self) -> bool:
- return self._data.get("dry_run", True)
- # ---- UF ----
- @property
- def uf_model_filename(self) -> str:
- return self._data["uf"].get("model_filename", "48h_dqn_model.zip")
- @property
- def uf_is_times(self) -> bool:
- return self._data["uf"].get("is_times", False)
- @property
- def uf_trigger_value(self) -> int:
- return int(self._data["uf"].get("trigger_value", 95))
- @property
- def uf_poll_interval(self) -> int:
- return int(self._data["uf"].get("poll_interval", 2))
- @property
- def uf_units(self) -> List[str]:
- units_map = self._data["uf"].get("units", {})
- if isinstance(units_map, dict):
- return units_map.get(self.plant, ["UF1"])
- return units_map
- # ---- Diagnosis ----
- @property
- def diagnosis_interval_minutes(self) -> int:
- return int(self._data["diagnosis"].get("interval_minutes", 40))
- # ---- RO ----
- @property
- def ro_tmp_limit(self) -> float:
- return float(self._data["ro"].get("tmp_limit", 0.21))
- @property
- def ro_dpt_type(self) -> str:
- return self._data["ro"].get("dpt_type", "DPT_1")
- @property
- def ro_units(self) -> List[str]:
- units_map = self._data["ro"].get("units", {})
- if isinstance(units_map, dict):
- return units_map.get(self.plant, ["RO1", "RO2"])
- return units_map
- @property
- def ro_schedule(self) -> str:
- return self._data["ro"].get("schedule", "daily")
- @property
- def ro_history_paths(self) -> Dict[str, str]:
- return self._data["ro"].get("history_paths", {})
- # ---- Paths (全部基于 core/models/) ----
- @property
- def uf_root(self) -> Path:
- return CORE_ROOT / "models" / "uf-rl"
- @property
- def uf_model_path(self) -> Path:
- return self.uf_root / self.plant / self.uf_model_filename
- @property
- def uf_env_config_path(self) -> Path:
- return self.uf_root / self.plant / "env_config.yaml"
- @property
- def diagnosis_root(self) -> Path:
- return CORE_ROOT / "models" / "dynamic_anomaly_diagnosis"
- @property
- def ro_root(self) -> Path:
- return CORE_ROOT / "models" / "ro_mechanism_predict"
- # ---- General ----
- def get(self, key_path: str, default: Any = None) -> Any:
- keys = key_path.split(".")
- value = self._data
- for k in keys:
- if isinstance(value, dict) and k in value:
- value = value[k]
- else:
- return default
- return value
|