import os import pandas as pd import yaml # ============================= # 配置加载器 # ============================= class UFConfigLoader: def __init__(self, config_path="config/uf_analyze_config.yaml"): with open(config_path, "r", encoding="utf-8") as f: self.cfg = yaml.safe_load(f) @property def uf(self): return self.cfg["UF"] @property def params(self): return self.cfg["Params"] @property def paths(self): return self.cfg["Paths"] # ============================= # 数据加载器 # ============================= class UFDataLoader: def __init__(self, data_path: str): self.data_path = data_path def load_all_csv(self) -> pd.DataFrame: """读取目录下所有 CSV 并合并成一个 DataFrame""" files = [f for f in os.listdir(self.data_path) if f.endswith(".csv")] dfs = [pd.read_csv(os.path.join(self.data_path, f), parse_dates=["time"]) for f in files] return pd.concat(dfs, ignore_index=True) def load_single_csv(self, file_name: str) -> pd.DataFrame: """读取单个 CSV""" return pd.read_csv(os.path.join(self.data_path, file_name), parse_dates=["time"])