config.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. """
  2. 统一配置加载。从 core/config/pipeline_config.yaml 读取基础配置,
  3. 从 core/config/.env 或环境变量覆盖敏感凭证。
  4. 所有路径均基于 core/ 目录内部,不依赖外部 models/。
  5. """
  6. import os
  7. from pathlib import Path
  8. from typing import Any, Dict, List, Optional
  9. import yaml
  10. # core/ 目录(本文件所在目录)
  11. CORE_ROOT = Path(__file__).resolve().parent
  12. def _resolve_env_vars(value: Any) -> Any:
  13. """递归解析 ${VAR} 格式的环境变量引用。"""
  14. if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
  15. env_key = value[2:-1]
  16. if ":-" in env_key:
  17. env_key, default = env_key.split(":-", 1)
  18. return os.environ.get(env_key, default)
  19. return os.environ.get(env_key, value)
  20. if isinstance(value, dict):
  21. return {k: _resolve_env_vars(v) for k, v in value.items()}
  22. if isinstance(value, list):
  23. return [_resolve_env_vars(v) for v in value]
  24. return value
  25. def _load_dotenv(dotenv_path: Optional[Path] = None) -> None:
  26. if dotenv_path is None:
  27. dotenv_path = CORE_ROOT / "config" / ".env"
  28. if not dotenv_path.exists():
  29. return
  30. with open(dotenv_path, "r", encoding="utf-8") as f:
  31. for line in f:
  32. line = line.strip()
  33. if not line or line.startswith("#"):
  34. continue
  35. if "=" not in line:
  36. continue
  37. key, _, val = line.partition("=")
  38. key = key.strip()
  39. val = val.strip().strip("'\"")
  40. if key not in os.environ:
  41. os.environ[key] = val
  42. class PipelineConfig:
  43. """管线统一配置,所有路径基于 core/ 目录。"""
  44. def __init__(self, config_path: Optional[str] = None, plant: str = "xishan"):
  45. _load_dotenv()
  46. if config_path is None:
  47. config_path = str(CORE_ROOT / "config" / "pipeline_config.yaml")
  48. with open(config_path, "r", encoding="utf-8") as f:
  49. raw = yaml.safe_load(f)
  50. self._data: Dict[str, Any] = _resolve_env_vars(raw)
  51. self.plant = plant
  52. # ---- API ----
  53. @property
  54. def api_base_url(self) -> str:
  55. return self._data["api"]["base_url"]
  56. @property
  57. def project_id(self) -> int:
  58. return int(self._data["api"].get("project_id", 92))
  59. # ---- Login ----
  60. @property
  61. def login_user(self) -> str:
  62. return self._data["api"].get("login_user", "admin")
  63. @property
  64. def login_password(self) -> str:
  65. return self._data["api"].get("login_password", "")
  66. @property
  67. def login_dep_id(self) -> str:
  68. return str(self._data["api"].get("login_dep_id", "135"))
  69. # ---- Database ----
  70. @property
  71. def db_config(self) -> Dict[str, Any]:
  72. return dict(self._data.get("database", {}))
  73. # ---- SCADA ----
  74. @property
  75. def scada_secret(self) -> str:
  76. return self._data["scada"]["secret"]
  77. # ---- Dry Run ----
  78. @property
  79. def dry_run(self) -> bool:
  80. return self._data.get("dry_run", True)
  81. # ---- UF ----
  82. @property
  83. def uf_model_filename(self) -> str:
  84. return self._data["uf"].get("model_filename", "48h_dqn_model.zip")
  85. @property
  86. def uf_is_times(self) -> bool:
  87. return self._data["uf"].get("is_times", False)
  88. @property
  89. def uf_trigger_value(self) -> int:
  90. return int(self._data["uf"].get("trigger_value", 95))
  91. @property
  92. def uf_poll_interval(self) -> int:
  93. return int(self._data["uf"].get("poll_interval", 2))
  94. @property
  95. def uf_units(self) -> List[str]:
  96. units_map = self._data["uf"].get("units", {})
  97. if isinstance(units_map, dict):
  98. return units_map.get(self.plant, ["UF1"])
  99. return units_map
  100. # ---- Diagnosis ----
  101. @property
  102. def diagnosis_interval_minutes(self) -> int:
  103. return int(self._data["diagnosis"].get("interval_minutes", 40))
  104. # ---- RO ----
  105. @property
  106. def ro_tmp_limit(self) -> float:
  107. return float(self._data["ro"].get("tmp_limit", 0.21))
  108. @property
  109. def ro_dpt_type(self) -> str:
  110. return self._data["ro"].get("dpt_type", "DPT_1")
  111. @property
  112. def ro_units(self) -> List[str]:
  113. units_map = self._data["ro"].get("units", {})
  114. if isinstance(units_map, dict):
  115. return units_map.get(self.plant, ["RO1", "RO2"])
  116. return units_map
  117. @property
  118. def ro_schedule(self) -> str:
  119. return self._data["ro"].get("schedule", "daily")
  120. @property
  121. def ro_history_paths(self) -> Dict[str, str]:
  122. return self._data["ro"].get("history_paths", {})
  123. # ---- Paths (全部基于 core/models/) ----
  124. @property
  125. def uf_root(self) -> Path:
  126. return CORE_ROOT / "models" / "uf-rl"
  127. @property
  128. def uf_model_path(self) -> Path:
  129. return self.uf_root / self.plant / self.uf_model_filename
  130. @property
  131. def uf_env_config_path(self) -> Path:
  132. return self.uf_root / self.plant / "env_config.yaml"
  133. @property
  134. def diagnosis_root(self) -> Path:
  135. return CORE_ROOT / "models" / "dynamic_anomaly_diagnosis"
  136. @property
  137. def ro_root(self) -> Path:
  138. return CORE_ROOT / "models" / "ro_mechanism_predict"
  139. # ---- General ----
  140. def get(self, key_path: str, default: Any = None) -> Any:
  141. keys = key_path.split(".")
  142. value = self._data
  143. for k in keys:
  144. if isinstance(value, dict) and k in value:
  145. value = value[k]
  146. else:
  147. return default
  148. return value