loader.py 1007 B

123456789101112131415161718192021222324252627282930313233
  1. import pandas as pd
  2. from pathlib import Path
  3. def resolve_path(project_root: Path, relative_path: str) -> Path:
  4. return project_root / relative_path
  5. def load_all_units_cycles(cfg: dict) -> pd.DataFrame:
  6. """
  7. 读取所有启用 UF 机组的 filtered_cycles 数据,
  8. 返回【物理周期层级】的合并 DataFrame。
  9. """
  10. project_root = Path(cfg["Paths"]["project_root"])
  11. raw_cfg = cfg["Paths"]["raw_data"]
  12. all_dfs = []
  13. for unit in raw_cfg["enabled_units"]:
  14. file_name = raw_cfg["cycle_file_pattern"].format(unit=unit)
  15. file_path = resolve_path(
  16. project_root,
  17. raw_cfg["filtered_cycles_dir"]
  18. ) / file_name
  19. if not file_path.exists():
  20. raise FileNotFoundError(f"未找到 UF{unit} 数据文件: {file_path}")
  21. df_unit = pd.read_csv(file_path)
  22. df_unit["unit_id"] = unit # 仅用于分组,不进入状态
  23. all_dfs.append(df_unit)
  24. return pd.concat(all_dfs, ignore_index=True)