فهرست منبع

1. 添加龙亭动态异常诊断模型
2. 动态异常诊断模型代码整理

zhanghao 3 هفته پیش
والد
کامیت
0eaaae7914

+ 0 - 54
models/Dynamic_anomaly_diagnosis/causal_structure.py

@@ -1,54 +0,0 @@
-# -*- coding: utf-8 -*-
-"""causal_structure.py: 第二层 - 物理因果结构构建"""
-import numpy as np
-import pandas as pd
-from config import config
-
-class CausalStructureBuilder:
-    def __init__(self, threshold_df):
-        self.df = threshold_df
-        self.sensor_list = self.df['ID'].tolist()
-        self.id_to_idx = {name: i for i, name in enumerate(self.sensor_list)}
-        self.num_sensors = len(self.sensor_list)
-        self.col_layer = self._find_col_by_keyword(config.KEYWORD_LAYER)
-        self.col_device = self._find_col_by_keyword(config.KEYWORD_DEVICE)
-
-    def _find_col_by_keyword(self, keyword):
-        if keyword in self.df.columns: return keyword
-        for col in self.df.columns:
-            if col.lower() == keyword.lower(): return col
-        raise ValueError(f"错误: 未找到列名包含 '{keyword}' 的列")
-
-    def build(self):
-        adj_matrix = np.zeros((self.num_sensors, self.num_sensors), dtype=int)
-        nodes_info = {}
-        for _, row in self.df.iterrows():
-            d_val = row[self.col_device]
-            dev_id = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None
-            try: l_val = int(row[self.col_layer])
-            except: l_val = -1
-            nodes_info[row['ID']] = {'layer': l_val, 'device': dev_id}
-            
-        count_edges = 0
-        for i, src_name in enumerate(self.sensor_list):
-            src_node = nodes_info.get(src_name)
-            if not src_node or src_node['layer'] == -1: continue
-            src_l, src_d = src_node['layer'], src_node['device']
-            
-            for j, dst_name in enumerate(self.sensor_list):
-                if i == j: continue 
-                dst_node = nodes_info.get(dst_name)
-                if not dst_node or dst_node['layer'] == -1: continue
-                dst_l, dst_d = dst_node['layer'], dst_node['device']
-                
-                is_layer_valid = (dst_l == src_l) or (dst_l == src_l - 1)
-                if not is_layer_valid: continue
-                    
-                is_dev_valid = True
-                if (src_d is not None) and (dst_d is not None):
-                    if src_d != dst_d: is_dev_valid = False
-                
-                if is_dev_valid:
-                    adj_matrix[i, j] = 1
-                    count_edges += 1
-        return {"sensor_list": self.sensor_list, "sensor_to_idx": self.id_to_idx, "adj_matrix": adj_matrix}

+ 0 - 98
models/Dynamic_anomaly_diagnosis/config.py

@@ -1,98 +0,0 @@
-# -*- coding: utf-8 -*-
-"""config.py: 参数文件"""
-import os
-
-class Config:
-    # ---------------- 路径配置 ----------------
-    # 项目根目录
-    BASE_DIR = os.path.dirname(os.path.abspath(__file__))
-    # 传感器时序数据文件存放目录
-    DATASET_SENSOR_DIR = os.path.join(BASE_DIR, "datasets_xishan")
-    # 训练好的模型权重保存目录
-    MODEL_SAVE_DIR = os.path.join(BASE_DIR, "models")
-    # 最终结果报表保存目录
-    RESULT_SAVE_DIR = os.path.join(BASE_DIR, "results")
-    
-    # 阈值配置文件名 (包含传感器阈值、层级One_layer、设备Device等元数据)
-    THRESHOLD_FILENAME = "sensor_threshold.xlsx"
-    # 专家知识库文件名 (包含已知的历史异常链路)
-    ABNORMAL_LINK_FILENAME = "abnormal_link.xlsx"
-    # 传感器数据文件的命名前缀 (如 data_process_1.csv)
-    SENSOR_FILE_PREFIX = "data_process_"
-    # 最终生成的测试评估报告文件名
-    TEST_RESULT_FILENAME = "Final_Test_Report.xlsx" 
-    
-    # Excel中用于识别关键列的关键字
-    KEYWORD_LAYER = 'One_layer' # 用于构建因果图层级的列名关键字
-    KEYWORD_DEVICE = 'Device'   # 用于设备一致性约束的列名关键字
-    
-    # ---------------- 数据处理参数 ----------------
-    # 要读取的文件编号范围 (例如读取 data_process_1 到 data_process_119)
-    SENSOR_FILE_NUM_RANGE = (1, 119)
-    # 原始数据的采样间隔 (秒)
-    ORIGINAL_SAMPLE_INTERVAL = 4
-    # 降采样后的目标间隔 (秒),用于减少数据量加速计算
-    TARGET_SAMPLE_INTERVAL = 20
-    # 一个检测窗口的时间长度 (分钟)
-    WINDOW_DURATION_MIN = 40
-    # 每个窗口包含的数据点数 = (40分钟 * 60秒) / 20秒 = 120点
-    POINTS_PER_WINDOW = int((WINDOW_DURATION_MIN * 60) / TARGET_SAMPLE_INTERVAL)
-    # 滑动窗口的步长 (点数),设为窗口的一半以增加样本覆盖率
-    WINDOW_STEP = POINTS_PER_WINDOW // 2
-    # 窗口有效性阈值:窗口内非空数据比例需超过此值(60%)才会被处理,否则视为无效窗口
-    VALID_DATA_RATIO = 0.6
-    # 窗口异常判定阈值 (用于判断根因节点的异常程度是否足够高)
-    WINDOW_ANOMALY_THRESHOLD = 0.2
-    # 训练集与测试集的划分比例 (0.8 表示前80%的时间段用于训练,后20%用于测试)
-    TRAIN_TEST_SPLIT = 0.8
-    # 诱发变量的触发阈值
-    TRIGGER_SCORE_THRESH = 0.5
-    
-    # ---------------- 异常量化得分权重与动态MAD参数 ----------------
-    # 绝对阈值异常得分权重
-    ABSOLUTE_SCORE_WEIGHT = 0.6
-    # 动态MAD异常得分权重
-    DYNAMIC_SCORE_WEIGHT = 0.4
-    
-    # 动态MAD滚动窗口大小 (360 = 2小时)
-    MAD_HISTORY_WINDOW = 360
-    # 动态MAD判定阈值 
-    MAD_THRESHOLD = 3.0
-    
-    # ---------------- 诱发变量列表 ----------------
-    # 定义哪些传感器是“诱发变量” 
-    TRIGGER_SENSORS = [
-        "UF1Per", "UF2Per", "UF3Per", "UF4Per",
-        "C.M.RO1_DB@DPT_1", "C.M.RO2_DB@DPT_1", "C.M.RO3_DB@DPT_1", "C.M.RO4_DB@DPT_1",
-        "C.M.RO1_DB@DPT_2", "C.M.RO2_DB@DPT_2", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_2",
-        "RO1_CSFlow", "RO2_CSFlow", "RO3_CSFlow", "RO4_CSFlow",
-        "RO1HSL", "RO2HSL", "RO3HSL", "RO4HSL",
-        "RO1_TYL", "RO2_TYL", "RO3_TYL", "RO4_TYL"
-    ]
-
-    # ---------------- 强化学习核心参数 ----------------
-    # 生成的异常链路最小长度限制 (防止路径过短)
-    MIN_PATH_LENGTH = 3
-    # 生成的异常链路最大长度限制 (防止路径过长发散)
-    MAX_PATH_LENGTH = 6
-    
-    # 网络结构参数
-    EMBEDDING_DIM = 64  # 节点的嵌入向量维度
-    HIDDEN_DIM = 256    # 隐藏层维度
-    
-    # PPO (Proximal Policy Optimization) 算法超参数
-    PPO_LR = 3e-4             # 学习率
-    PPO_GAMMA = 0.90          # 折扣因子
-    PPO_EPS_CLIP = 0.2        # PPO更新时的截断范围,防止策略更新幅度过大
-    PPO_K_EPOCHS = 10         # 每次数据采集后,网络更新的循环次数
-    PPO_BATCH_SIZE = 64       # 训练批次大小
-    
-    # 训练轮次设置
-    BC_EPOCHS = 50000         # 行为克隆 (Behavior Cloning) 轮次
-    RL_EPISODES = 5000        # 强化学习 (PPO) 轮次
-    
-    # 自动创建所需的目录结构
-    for d in [MODEL_SAVE_DIR, DATASET_SENSOR_DIR, RESULT_SAVE_DIR]:
-        os.makedirs(d, exist_ok=True)
-
-config = Config()

+ 0 - 269
models/Dynamic_anomaly_diagnosis/data_processing.py

@@ -1,269 +0,0 @@
-# -*- coding: utf-8 -*-
-"""data_processing.py: 第一层 - 异常量化表征 """
-import pandas as pd
-import numpy as np
-import os
-from tqdm import tqdm
-from config import config
-import gc
-from joblib import Parallel, delayed
-import multiprocessing
-
-def _process_single_file_task(file_idx, file_path, sensor_list, target_interval):
-    """
-    单个文件的读取与降采样任务(运行在子进程中)
-    """
-    if not os.path.exists(file_path):
-        return None
-    
-    try:
-        # 1. 快速读取
-        df_temp = pd.read_csv(file_path, index_col=None, low_memory=False)
-        if df_temp.empty:
-            return None
-            
-        # 2. 时间解析 (mixed格式)
-        time_col = df_temp.columns[0]
-        df_temp[time_col] = pd.to_datetime(
-            df_temp[time_col], 
-            format='mixed', 
-            errors='coerce'
-        )
-        df_temp = df_temp.dropna(subset=[time_col]).set_index(time_col)
-        
-        # 3. 筛选列
-        valid_cols = [c for c in df_temp.columns if c in sensor_list]
-        if not valid_cols:
-            return None
-            
-        df_valid = df_temp[valid_cols]
-        
-        # 4. 立即降采样 (4s -> 20s) & 类型转换
-        df_resampled = df_valid.resample(f"{target_interval}s").mean()
-        df_resampled = df_resampled.astype(np.float32)
-        
-        return df_resampled
-        
-    except Exception as e:
-        print(f"文件 {file_idx} 处理出错: {e}")
-        return None
-
-def _calculate_window_chunk(start_indices, values, win_len, valid_ratio, threshold_val=0.95):
-    """
-    窗口计算任务块(运行在子进程中)
-    处理一批窗口的 nanquantile 计算
-    """
-    chunk_results = []
-    
-    for start in start_indices:
-        win_data = values[start : start + win_len, :]
-        
-        # 向量化计算有效性
-        # axis=0 沿时间轴统计
-        valid_counts = np.sum(~np.isnan(win_data), axis=0)
-        valid_ratios = valid_counts / win_len
-        
-        win_res = np.zeros(win_data.shape[1], dtype=np.float32)
-        valid_mask = valid_ratios >= valid_ratio
-        
-        if np.any(valid_mask):
-            # 并行化
-            quantile_scores = np.nanquantile(win_data[:, valid_mask], threshold_val, axis=0)
-            win_res[valid_mask] = quantile_scores
-            
-        chunk_results.append(np.clip(win_res, 0, 1))
-        
-    return chunk_results
-
-class DataAnomalyProcessor:
-    def __init__(self):
-        self.threshold_path = os.path.join(config.BASE_DIR, config.THRESHOLD_FILENAME)
-        self.threshold_df = self._load_thresholds()
-        self.sensor_list = self.threshold_df['ID'].tolist()
-        self.threshold_dict = self.threshold_df.set_index('ID').to_dict('index')
-        
-        # 根据CPU核心数设定并行度
-        self.n_jobs_io = 24  
-        self.n_jobs_cpu = 64 
-        
-    def _load_thresholds(self):
-        """加载阈值表"""
-        try:
-            df = pd.read_excel(self.threshold_path)
-        except Exception as e:
-            raise ValueError(f"读取阈值表失败: {e}")
-
-        required_cols = ['ID', 'Direction', 'Hard_min', 'Hard_max', 'Good_min', 'Good_max']
-        for col in required_cols:
-            if col not in df.columns:
-                for c in df.columns:
-                    if c.lower() == col.lower():
-                        df.rename(columns={c: col}, inplace=True)
-                        break
-        
-        cols_to_numeric = ['Hard_min', 'Hard_max', 'Good_min', 'Good_max', 'Alarm_time']
-        for c in cols_to_numeric:
-            if c in df.columns:
-                df[c] = pd.to_numeric(df[c], errors='coerce')
-        
-        df['Hard_min'] = df['Hard_min'].fillna(-np.inf)
-        df['Hard_max'] = df['Hard_max'].fillna(np.inf)
-        df['Good_min'] = df['Good_min'].fillna(-np.inf)
-        df['Good_max'] = df['Good_max'].fillna(np.inf)
-        df['Alarm_time'] = df['Alarm_time'].fillna(60)
-        return df
-
-    def _calculate_point_score_vectorized(self, series, sensor_name):
-        # 向量化计算逻辑
-        if sensor_name not in self.threshold_dict:
-            return pd.Series(0.0, index=series.index, dtype=np.float32)
-        
-        info = self.threshold_dict[sensor_name]
-        vals = series.astype(np.float32).copy()
-        
-        # 1. 硬阈值掩码(在硬阈值外的数据视为无效/缺失)
-        mask_invalid = (vals < info['Hard_min']) | (vals > info['Hard_max'])
-        vals[mask_invalid] = np.nan
-        
-        # 如果全部为 NaN,直接返回 0
-        if vals.isna().all(): 
-            return pd.Series(0.0, index=vals.index, dtype=np.float32)
-        
-        # ==================== (A) 计算绝对阈值得分 ====================
-        abs_scores = pd.Series(0.0, index=vals.index, dtype=np.float32)
-        direction = str(info['Direction']).strip().lower()
-        
-        if direction in ['low', 'both']:
-            mask_low = (vals < info['Good_min']) & (vals >= info['Hard_min'])
-            denom = info['Good_min'] - info['Hard_min']
-            if denom > 1e-5:
-                abs_scores[mask_low] = (info['Good_min'] - vals[mask_low]) / denom
-            else:
-                abs_scores[mask_low] = 1.0
-                
-        if direction in ['high', 'both']:
-            mask_high = (vals > info['Good_max']) & (vals <= info['Hard_max'])
-            denom = info['Hard_max'] - info['Good_max']
-            if denom > 1e-5:
-                abs_scores[mask_high] = (vals[mask_high] - info['Good_max']) / denom
-            else:
-                abs_scores[mask_high] = 1.0
-
-        alarm_t = max(info['Alarm_time'], 1.0)
-        time_weight = 1.0 + (30.0 / alarm_t) 
-        abs_scores = (abs_scores * time_weight).clip(0, 1)
-        
-        # ==================== (B) 计算动态 MAD 得分 ====================
-        # 使用 rolling 计算滚动窗口内的中位数 (Median)
-        rolling_median = vals.rolling(
-            window=config.MAD_HISTORY_WINDOW, 
-            min_periods=1
-        ).median()
-        
-        # 计算绝对偏差 |x_i - Median|
-        abs_deviation = (vals - rolling_median).abs()
-        
-        # 计算 MAD = Median(|x_i - Median|)
-        rolling_mad = abs_deviation.rolling(
-            window=config.MAD_HISTORY_WINDOW, 
-            min_periods=1
-        ).median()
-        
-        # 计算动态得分:将偏差映射到 0~1 的区间,除以 (MAD_THRESHOLD * MAD)
-        # 加 1e-5 防止除以 0 导致溢出
-        dyn_scores = (abs_deviation / (rolling_mad * config.MAD_THRESHOLD + 1e-5)).clip(0, 1)
-        
-        # ==================== (C) 综合加权得分 ====================
-        final_scores = (config.ABSOLUTE_SCORE_WEIGHT * abs_scores) + (config.DYNAMIC_SCORE_WEIGHT * dyn_scores)
-        
-        # 恢复无效数据的 NaN 状态
-        final_scores[mask_invalid] = np.nan 
-        
-        return final_scores
-
-    def process(self):
-        print(f">>> [Step 1] 数据处理启动 | 检测到 CPU 核心数: {multiprocessing.cpu_count()}")
-        
-        # 1. 并行读取与降采样
-        file_indices = list(range(config.SENSOR_FILE_NUM_RANGE[0], config.SENSOR_FILE_NUM_RANGE[1] + 1))
-        file_paths = [os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}{i}.csv") for i in file_indices]
-        
-        print(f"正在并行读取 {len(file_indices)} 个文件 (并发数: {self.n_jobs_io})...")
-        
-        # 使用 joblib 并行执行 _process_single_file_task
-        results = Parallel(n_jobs=self.n_jobs_io, backend="loky", verbose=5)(
-            delayed(_process_single_file_task)(
-                idx, path, self.sensor_list, config.TARGET_SAMPLE_INTERVAL
-            ) for idx, path in zip(file_indices, file_paths)
-        )
-        
-        # 过滤掉 None 结果
-        all_series_list = [r for r in results if r is not None]
-        
-        if not all_series_list:
-            raise ValueError("没有读取到有效数据")
-
-        # 2. 合并数据
-        print("正在纵向合并数据...")
-        resampled_df = pd.concat(all_series_list, axis=0).sort_index()
-        resampled_df = resampled_df[~resampled_df.index.duplicated(keep='first')]
-        
-        # 释放内存
-        del all_series_list, results
-        gc.collect()
-
-        # 3. 计算点级得分
-        print("计算点级异常得分...")
-        scores_dict = {}
-        for sensor in tqdm(self.sensor_list, desc="计算得分"):
-            if sensor in resampled_df.columns:
-                scores_dict[sensor] = self._calculate_point_score_vectorized(resampled_df[sensor], sensor)
-            else:
-                scores_dict[sensor] = pd.Series(np.nan, index=resampled_df.index, dtype=np.float32)
-        
-        point_score_df = pd.DataFrame(scores_dict)
-        point_score_df = point_score_df[self.sensor_list] # 对齐列顺序
-        
-        # 4. 并行窗口聚合 (计算密集型)
-        print(f"正在并行计算窗口分位数 (并发数: {self.n_jobs_cpu})...")
-        
-        values = point_score_df.values
-        win_len = config.POINTS_PER_WINDOW
-        step = config.WINDOW_STEP
-        total_len = len(values)
-        
-        if total_len < win_len:
-             return np.array([]), np.array([]), self.threshold_df
-        
-        # 生成所有窗口的起始索引
-        all_start_indices = list(range(0, total_len - win_len, step))
-        
-        # 将索引切分为多个 Chunk,每个 Chunk 分配给一个 CPU 核心
-        num_chunks = self.n_jobs_cpu * 4
-        chunk_size = max(1, len(all_start_indices) // num_chunks)
-        chunks = [all_start_indices[i:i + chunk_size] for i in range(0, len(all_start_indices), chunk_size)]
-        
-        # 并行计算
-        chunk_results = Parallel(n_jobs=self.n_jobs_cpu, backend="loky", verbose=5)(
-            delayed(_calculate_window_chunk)(
-                chunk, values, win_len, config.VALID_DATA_RATIO
-            ) for chunk in chunks
-        )
-        
-        # 合并结果
-        window_scores = [item for sublist in chunk_results for item in sublist]
-        
-        final_scores = np.nan_to_num(np.array(window_scores, dtype=np.float32), nan=0.0)
-        
-        # 5. 切分数据集
-        total_samples = final_scores.shape[0]
-        if total_samples == 0:
-            raise ValueError("窗口数量为0")
-            
-        split_idx = int(total_samples * config.TRAIN_TEST_SPLIT)
-        train_scores = final_scores[:split_idx]
-        test_scores = final_scores[split_idx:]
-        
-        print(f"处理完成: 训练集 {len(train_scores)} | 测试集 {len(test_scores)}")
-        
-        return train_scores, test_scores, self.threshold_df

+ 0 - 158
models/Dynamic_anomaly_diagnosis/input_format.txt

@@ -1,158 +0,0 @@
-index
-C.M.FT_ZJS@out
-C.M.LT_JSC@out
-UF_bump1_n
-UF_bump2_n
-UF_bump3_n
-UF_bump4_n
-C.M.UF_GSB1_fre@out
-C.M.UF_GSB2_fre@out
-C.M.UF_GSB3_fre@out
-C.M.UF_GSB4_fre@out
-C.M.UF_GSB1_A@out
-C.M.UF_GSB2_A@out
-C.M.UF_GSB3_A@out
-C.M.UF_GSB4_A@out
-UF_bump_avg
-water_in
-C.M.UF_PT_ZJS@out
-C.M.UF_Tur_ZJS@out
-C.M.RO_TT_ZJS@out
-PFTMP
-C.M.UF1_FT_JS@out
-C.M.UF2_FT_JS@out
-C.M.UF3_FT_JS@out
-C.M.UF4_FT_JS@out
-C.M.UF1_JSF_kd@out
-C.M.UF2_JSF_kd@out
-C.M.UF3_JSF_kd@out
-C.M.UF4_JSF_kd@out
-C.M.UF1_PT_JS@out
-C.M.UF2_PT_JS@out
-C.M.UF3_PT_JS@out
-C.M.UF4_PT_JS@out
-UF1_FluxF
-UF2_FluxF
-UF3_FluxF
-UF4_FluxF
-UF1Per
-UF2Per
-UF3Per
-UF4Per
-C.M.UF1_DB@press_PV
-C.M.UF2_DB@press_PV
-C.M.UF3_DB@press_PV
-C.M.UF4_DB@press_PV
-C.M.UF1_PT_CS@out
-C.M.UF2_PT_CS@out
-C.M.UF3_PT_CS@out
-C.M.UF4_PT_CS@out
-C.M.UF_PT_ZCS@out
-C.M.UF_FT_ZCS@out
-C.M.UF_Cl_ZCS@out
-C.M.UF_ORP_ZCS@out
-C.M.UF_PH_ZCS@out
-C.M.UF_Tur_ZCS@out
-RO_TotalFlow
-C.M.RO_Cond_ZJS@out
-C.M.RO_ORP_ZJS@out
-C.M.RO_PH_ZJS@out
-C.M.LT_HYJ1@out
-C.M.LT_HYJ2@out
-C.M.LT_ZGJ@out
-C.M.LT_SJJ@out
-RO1_GYB_n
-RO2_GYB_n
-RO3_GYB_n
-RO4_GYB_n
-C.M.RO1_GYB_fre@out
-C.M.RO2_GYB_fre@out
-C.M.RO3_GYB_fre@out
-C.M.RO4_GYB_fre@out
-C.M.RO1_GYB_A@out
-C.M.RO2_GYB_A@out
-C.M.RO3_GYB_A@out
-C.M.RO4_GYB_A@out
-C.M.RO1_GYBF_kd@out
-C.M.RO2_GYBF_kd@out
-C.M.RO3_GYBF_kd@out
-C.M.RO4_GYBF_kd@out
-C.M.RO1_FT_JS@out
-C.M.RO2_FT_JS@out
-C.M.RO3_FT_JS@out
-C.M.RO4_FT_JS@out
-C.M.RO1_PT_JS@out
-C.M.RO2_PT_JS@out
-C.M.RO3_PT_JS@out
-C.M.RO4_PT_JS@out
-C.M.RO1_DB@DPT_1
-C.M.RO2_DB@DPT_1
-C.M.RO3_DB@DPT_1
-C.M.RO4_DB@DPT_1
-C.M.RO1_PT_DJ1@out
-C.M.RO2_PT_DJ1@out
-C.M.RO3_PT_DJ1@out
-C.M.RO4_PT_DJ1@out
-RO1_DJB_n
-RO2_DJB_n
-RO3_DJB_n
-RO4_DJB_n
-C.M.RO1_DJB_fre@out
-C.M.RO2_DJB_fre@out
-C.M.RO3_DJB_fre@out
-C.M.RO4_DJB_fre@out
-C.M.RO1_DJB_A@out
-C.M.RO2_DJB_A@out
-C.M.RO3_DJB_A@out
-C.M.RO4_DJB_A@out
-C.M.RO1_PT_DJ2@out
-C.M.RO2_PT_DJ2@out
-C.M.RO3_PT_DJ2@out
-C.M.RO4_PT_DJ2@out
-C.M.RO1_DB@DPT_2
-C.M.RO2_DB@DPT_2
-C.M.RO3_DB@DPT_2
-C.M.RO4_DB@DPT_2
-C.M.RO1_PT_NS@out
-C.M.RO2_PT_NS@out
-C.M.RO3_PT_NS@out
-C.M.RO4_PT_NS@out
-C.M.RO1_FT_CS2@out
-C.M.RO2_FT_CS2@out
-C.M.RO3_FT_CS2@out
-C.M.RO4_FT_CS2@out
-RO1_SPK
-RO2_SPK
-RO3_SPK
-RO4_SPK
-C.M.RO1_FT_NS@out
-C.M.RO2_FT_NS@out
-C.M.RO3_FT_NS@out
-C.M.RO4_FT_NS@out
-RO1_CSFlow
-RO2_CSFlow
-RO3_CSFlow
-RO4_CSFlow
-RO1_FluxF
-RO2_FluxF
-RO3_FluxF
-RO4_FluxF
-C.M.RO1_PT_CS@out
-C.M.RO2_PT_CS@out
-C.M.RO3_PT_CS@out
-C.M.RO4_PT_CS@out
-C.M.RO1_Cond_CS@out
-C.M.RO2_Cond_CS@out
-C.M.RO3_Cond_CS@out
-C.M.RO4_Cond_CS@out
-RO1HSL
-RO2HSL
-RO3HSL
-RO4HSL
-RO1_TYL
-RO2_TYL
-RO3_TYL
-RO4_TYL
-C.M.RO_PT_ZCS@out
-RO_TCHFlow
-C.M.RO_Cond_ZCS@out

+ 0 - 32
models/Dynamic_anomaly_diagnosis/main.py

@@ -1,32 +0,0 @@
-# -*- coding: utf-8 -*-
-"""main.py: 主运行文件"""
-from data_processing import DataAnomalyProcessor
-from causal_structure import CausalStructureBuilder
-from rl_tracing import RLTrainer
-
-def main():
-    
-    # 1. 数据层 (返回切分好的数据)
-    processor = DataAnomalyProcessor()
-    train_scores, test_scores, threshold_df = processor.process()
-    
-    # 2. 因果层
-    builder = CausalStructureBuilder(threshold_df)
-    causal_graph = builder.build()
-    
-    # 3. 强化学习层
-    # 初始化传入训练集
-    trainer = RLTrainer(causal_graph, train_scores, threshold_df)
-    
-    # 3.1 训练阶段
-    trainer.pretrain_bc()   # 学习已有的
-    trainer.train_ppo()     # 探索未知的
-    trainer.save_model()
-    
-    # 3.2 评估阶段 (使用测试集)
-    trainer.evaluate(test_scores)
-    
-    print("\n[Success] 所有任务执行完毕!")
-
-if __name__ == "__main__":
-    main()

BIN
models/Dynamic_anomaly_diagnosis/models/ppo_tracing_model.pth


+ 0 - 44
models/Dynamic_anomaly_diagnosis/output_format.txt

@@ -1,44 +0,0 @@
-输入按照input_format.txt文件中的变量输入(跟原来一样,不变),输入为过去2h的数据(1800条数据),40min的数据也能正常运行,但是动态范围计算可能存在问题
-输出分为三种情况:输入数据时间不足(至少大于40min);无异常;存在异常,给出异常的诱发变量,异常路径和根因变量
-{'status': 'warning', 'message': '数据时长不足: 6.6min < 40min'}
-{'status': 'normal', 'message': '系统运行正常'}
-{
-  "status": "abnormal",
-  "results": [
-    {
-      "trigger": "RO3_TYL",
-      "path": "RO3_TYL -> C.M.RO3_Cond_CS@out -> C.M.RO3_PT_NS@out -> C.M.RO3_PT_DJ1@out",
-      "root_cause": "C.M.RO3_PT_DJ1@out",
-      "details": [
-        {
-          "node": "RO3_TYL",
-          "name": "RO3脱盐率",
-          "anomaly_score": 0.5178,
-          "is_abnormal": true,
-          "deviation": "当前值: 94.78 | 物理工况: 偏低 2.8% (物理允许下限: 97.5) | 动态趋势: 平稳波动 (近期基线: 94.73, 动态区间: [94.59, 94.87])"
-        },
-        {
-          "node": "C.M.RO3_Cond_CS@out",
-          "name": "RO3产水电导",
-          "anomaly_score": 0.4,
-          "is_abnormal": true,
-          "deviation": "当前值: 106.80 | 物理工况: 正常 (物理范围: [10.0, 250.0]) | 动态趋势: 平稳波动 (近期基线: 107.43, 动态区间: [105.12, 109.73])"
-        },
-        {
-          "node": "C.M.RO3_PT_NS@out",
-          "name": "RO3二段浓水压力",
-          "anomaly_score": 0.4,
-          "is_abnormal": true,
-          "deviation": "当前值: 0.96 | 物理工况: 正常 (物理范围: [0.5, 1.05]) | 动态趋势: 平稳波动 (近期基线: 0.96, 动态区间: [0.96, 0.96])"
-        },
-        {
-          "node": "C.M.RO3_PT_DJ1@out",
-          "name": "RO3一段浓水压力",
-          "anomaly_score": 0.4,
-          "is_abnormal": true,
-          "deviation": "当前值: 0.90 | 物理工况: 正常 (物理范围: [0.02, 1.0]) | 动态趋势: 平稳波动 (近期基线: 0.89, 动态区间: [0.89, 0.90])"
-        }
-      ]
-    }
-  ]
-}

+ 0 - 443
models/Dynamic_anomaly_diagnosis/rl_tracing.py

@@ -1,443 +0,0 @@
-# -*- coding: utf-8 -*-
-"""rl_tracing.py: 强化学习链路级异常溯源"""
-import torch
-import torch.nn as nn
-import torch.optim as optim
-import torch.nn.functional as F
-from torch.distributions import Categorical
-import numpy as np
-import pandas as pd
-import os
-from tqdm import tqdm
-from config import config
-
-# ----------------- 1. 环境 -----------------
-class CausalTracingEnv:
-    def __init__(self, causal_graph, window_scores, threshold_df, expert_knowledge=None):
-        self.sensor_list = causal_graph['sensor_list']
-        self.map = causal_graph['sensor_to_idx']
-        self.idx_to_sensor = {v: k for k, v in self.map.items()}
-        self.adj = causal_graph['adj_matrix']
-        self.scores = window_scores
-        
-        self.expert_knowledge = expert_knowledge if expert_knowledge else {}
-        self.num_sensors = len(self.sensor_list)
-        
-        # 解析属性
-        self.node_props = {} 
-        col_one_layer = self._find_col(threshold_df, config.KEYWORD_LAYER)
-        col_device = self._find_col(threshold_df, config.KEYWORD_DEVICE)
-        
-        df_indexed = threshold_df.set_index('ID')
-        dict_one = df_indexed[col_one_layer].to_dict() if col_one_layer else {}
-        dict_dev = df_indexed[col_device].to_dict() if col_device else {}
-        
-        for name, idx in self.map.items():
-            l_val = dict_one.get(name, -1)
-            try: l_val = int(l_val)
-            except: l_val = 0 
-            d_val = dict_dev.get(name, None)
-            d_val = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None
-            self.node_props[idx] = {'one_layer': l_val, 'device': d_val}
-
-        self.current_window_idx = 0
-        self.current_node_idx = 0
-        self.prev_node_idx = 0
-        self.trigger_node_idx = 0
-        self.path = []
-        self.current_expert_paths = []
-        self.target_roots = set()
-        
-    def _find_col(self, df, keyword):
-        if keyword in df.columns: return keyword
-        for c in df.columns:
-            if c.lower() == keyword.lower(): return c
-        return None
-
-    def reset(self, force_window_idx=None, force_trigger=None):
-        if force_window_idx is not None:
-            self.current_window_idx = force_window_idx
-            t_name = force_trigger
-        else:
-            found = False
-            for _ in range(100):
-                w_idx = np.random.randint(len(self.scores))
-                win_scores = self.scores[w_idx]
-                candidates = []
-                for t_name in config.TRIGGER_SENSORS:
-                    if t_name in self.map:
-                        idx = self.map[t_name]
-                        if win_scores[idx] > config.TRIGGER_SCORE_THRESH:
-                            candidates.append(t_name)
-                if candidates:
-                    self.current_window_idx = w_idx
-                    t_name = np.random.choice(candidates)
-                    found = True
-                    break
-            if not found:
-                self.current_window_idx = np.random.randint(len(self.scores))
-                t_name = list(self.map.keys())[0]
-
-        self.current_node_idx = self.map.get(t_name, 0)
-        self.trigger_node_idx = self.current_node_idx
-        self.prev_node_idx = self.current_node_idx
-        self.path = [self.current_node_idx]
-        
-        self.target_roots = set()
-        self.current_expert_paths = []
-        if self.current_node_idx in self.expert_knowledge:
-            entry = self.expert_knowledge[self.current_node_idx]
-            self.target_roots = entry['roots']
-            self.current_expert_paths = entry['paths']
-            
-        return self._get_state()
-    
-    def _get_state(self):
-        curr_score = self.scores[self.current_window_idx, self.current_node_idx]
-        prev_score = self.scores[self.current_window_idx, self.prev_node_idx]
-        curr_layer = self.node_props[self.current_node_idx]['one_layer'] / 20.0
-        
-        return (
-            torch.LongTensor([self.current_node_idx, self.prev_node_idx, self.trigger_node_idx]), 
-            torch.FloatTensor([curr_score, prev_score, curr_layer])
-        )
-    
-    def get_valid_actions(self, curr_idx):
-        neighbors = np.where(self.adj[curr_idx] == 1)[0]
-        curr_props = self.node_props[curr_idx]
-        curr_l, curr_d = curr_props['one_layer'], curr_props['device']
-        valid = []
-        for n in neighbors:
-            if n in self.path: continue 
-            tgt_props = self.node_props[n]
-            tgt_l, tgt_d = tgt_props['one_layer'], tgt_props['device']
-            if curr_l != 0 and tgt_l != 0:
-                if not ((tgt_l == curr_l) or (tgt_l == curr_l - 1)): continue
-            if (curr_d is not None) and (tgt_d is not None):
-                if curr_d != tgt_d: continue
-            valid.append(n)
-        return np.array(valid)
-    
-    def step(self, action_idx):
-        prev = self.current_node_idx
-        self.prev_node_idx = prev
-        self.current_node_idx = action_idx
-        self.path.append(action_idx)
-        
-        score_curr = self.scores[self.current_window_idx, self.current_node_idx]
-        
-        reward = 0.0
-        done = False
-        
-        # 奖励机制 (Imitation > Root > Gradient)
-        in_expert_nodes = False
-        for e_path in self.current_expert_paths:
-            if action_idx in e_path:
-                in_expert_nodes = True
-                break
-        
-        if in_expert_nodes: reward += 2.0
-        else: reward -= 0.2
-            
-        if action_idx in self.target_roots:
-            reward += 10.0
-            done = True
-        
-        score_prev = self.scores[self.current_window_idx, prev]
-        diff = score_curr - score_prev
-        if diff > 0: reward += diff * 3.0
-        else: reward -= 0.5
-            
-        if len(self.path) >= config.MAX_PATH_LENGTH:
-            done = True
-            if action_idx not in self.target_roots: reward -= 5.0
-            
-        if score_curr < 0.15 and len(self.path) > 3:
-            done = True
-            reward -= 2.0
-            
-        return self._get_state(), reward, done, {}
-
-# ----------------- 2. 网络 -----------------
-class TargetDrivenActorCritic(nn.Module):
-    def __init__(self, num_sensors, embedding_dim=64, hidden_dim=256):
-        super().__init__()
-        self.node_emb = nn.Embedding(num_sensors, embedding_dim)
-        input_dim = (embedding_dim * 3) + 3 
-        
-        self.shared_net = nn.Sequential(
-            nn.Linear(input_dim, hidden_dim),
-            nn.ReLU(),
-            nn.LayerNorm(hidden_dim),
-            nn.Linear(hidden_dim, hidden_dim),
-            nn.ReLU()
-        )
-        self.actor = nn.Linear(hidden_dim, num_sensors)
-        self.critic = nn.Linear(hidden_dim, 1)
-        
-    def forward(self, int_data, float_data):
-        curr_emb = self.node_emb(int_data[:, 0])
-        prev_emb = self.node_emb(int_data[:, 1])
-        trig_emb = self.node_emb(int_data[:, 2]) 
-        x = torch.cat([curr_emb, prev_emb, trig_emb, float_data], dim=1)
-        feat = self.shared_net(x)
-        return self.actor(feat), self.critic(feat)
-
-# ----------------- 3. 训练器 -----------------
-class RLTrainer:
-    def __init__(self, causal_graph, train_scores, threshold_df):
-        self.sensor_map = causal_graph['sensor_to_idx']
-        self.idx_to_sensor = {v: k for k, v in self.sensor_map.items()}
-        self.threshold_df = threshold_df
-        self.causal_graph = causal_graph
-        self.expert_knowledge, self.bc_samples, _ = self._load_expert_data()
-        
-        self.env = CausalTracingEnv(causal_graph, train_scores, threshold_df, self.expert_knowledge)
-        self.model = TargetDrivenActorCritic(self.env.num_sensors, config.EMBEDDING_DIM, config.HIDDEN_DIM)
-        self.optimizer = optim.Adam(self.model.parameters(), lr=config.PPO_LR)
-        
-    def _load_expert_data(self):
-        path = os.path.join(config.BASE_DIR, config.ABNORMAL_LINK_FILENAME)
-        kb_data = {} 
-        bc_data = [] 
-        if not os.path.exists(path): return kb_data, bc_data, None
-        
-        df = pd.read_excel(path)
-        for _, row in df.iterrows():
-            link = str(row.get('Link Path', ''))
-            if not link: continue
-            nodes_str = [n.strip() for n in link.replace('→', '->').split('->')]
-            path_nodes = nodes_str[::-1]
-            
-            ids = []
-            valid = True
-            for n in path_nodes:
-                if n in self.sensor_map: ids.append(self.sensor_map[n])
-                else: valid = False; break
-            if not valid or len(ids)<2: continue
-            
-            trigger_id = ids[0]
-            root_id = ids[-1]
-            
-            if trigger_id not in kb_data:
-                kb_data[trigger_id] = {'paths': [], 'roots': set(), 'logic': row.get('Process Logic Basis', '')}
-            kb_data[trigger_id]['paths'].append(ids)
-            kb_data[trigger_id]['roots'].add(root_id)
-            
-            for i in range(len(ids) - 1):
-                curr = ids[i]
-                prev = ids[max(0, i-1)]
-                nxt = ids[i+1]
-                bc_data.append(((curr, prev, trigger_id), nxt))
-                
-        return kb_data, bc_data, df
-
-    def pretrain_bc(self):
-        if not self.bc_samples: return
-        print(f"\n>>> [Step 3.1] 启动BC预训练 ({config.BC_EPOCHS}轮)...")
-        states_int = torch.LongTensor([list(s) for s, a in self.bc_samples])
-        actions = torch.LongTensor([a for s, a in self.bc_samples])
-        states_float = torch.zeros((len(states_int), 3))
-        states_float[:, 0] = 0.9 
-        states_float[:, 1] = 0.8
-        
-        loss_fn = nn.CrossEntropyLoss()
-        pbar = tqdm(range(config.BC_EPOCHS), desc="BC Training")
-        for epoch in pbar:
-            logits, _ = self.model(states_int, states_float)
-            loss = loss_fn(logits, actions)
-            self.optimizer.zero_grad()
-            loss.backward()
-            self.optimizer.step()
-            if epoch%100==0: pbar.set_postfix({'Loss': f"{loss.item():.4f}"})
-
-    def train_ppo(self):
-        print(f"\n>>> [Step 3.2] 启动PPO训练 ({config.RL_EPISODES}轮)...")
-        pbar = tqdm(range(config.RL_EPISODES), desc="PPO Training")
-        rewards_hist = []
-        for _ in pbar:
-            state_data = self.env.reset()
-            done = False
-            ep_r = 0
-            b_int, b_float, b_act, b_lp, b_rew, b_mask = [], [], [], [], [], []
-            
-            while not done:
-                s_int = state_data[0].unsqueeze(0)
-                s_float = state_data[1].unsqueeze(0)
-                valid = self.env.get_valid_actions(s_int[0, 0].item())
-                if len(valid) == 0: break
-                
-                logits, _ = self.model(s_int, s_float)
-                mask = torch.full_like(logits, -1e9)
-                mask[0, valid] = 0
-                dist = Categorical(F.softmax(logits+mask, dim=-1))
-                action = dist.sample()
-                
-                next_s, r, done, _ = self.env.step(action.item())
-                b_int.append(s_int); b_float.append(s_float)
-                b_act.append(action); b_lp.append(dist.log_prob(action))
-                b_rew.append(r); b_mask.append(1-done)
-                state_data = next_s
-                ep_r += r
-            
-            if len(b_rew) > 1:
-                self._update_ppo(b_int, b_float, b_act, b_lp, b_rew, b_mask)
-            
-            rewards_hist.append(ep_r)
-            if len(rewards_hist)>50: rewards_hist.pop(0)
-            pbar.set_postfix({'AvgR': f"{np.mean(rewards_hist):.2f}"})
-
-    def _update_ppo(self, b_int, b_float, b_act, b_lp, b_rew, b_mask):
-        returns = []
-        R = 0
-        for r, m in zip(reversed(b_rew), reversed(b_mask)):
-            R = r + config.PPO_GAMMA * R * m
-            returns.insert(0, R)
-        returns = torch.tensor(returns)
-        
-        if returns.numel() > 1 and returns.std() > 1e-5:
-            returns = (returns - returns.mean()) / (returns.std() + 1e-5)
-        elif returns.numel() > 1:
-             returns = returns - returns.mean()
-        
-        s_int = torch.cat(b_int)
-        s_float = torch.cat(b_float)
-        act = torch.stack(b_act)
-        old_lp = torch.stack(b_lp).detach()
-        
-        for _ in range(config.PPO_K_EPOCHS):
-            logits, vals = self.model(s_int, s_float)
-            dist = Categorical(logits=logits)
-            new_lp = dist.log_prob(act)
-            ratio = torch.exp(new_lp - old_lp)
-            
-            surr1 = ratio * returns
-            surr2 = torch.clamp(ratio, 1-config.PPO_EPS_CLIP, 1+config.PPO_EPS_CLIP) * returns
-            
-            v_pred = vals.squeeze()
-            if v_pred.shape != returns.shape:
-                v_pred = v_pred.view(-1)
-                returns = returns.view(-1)
-                
-            loss = -torch.min(surr1, surr2).mean() + 0.5 * F.mse_loss(v_pred, returns)
-            self.optimizer.zero_grad()
-            loss.backward()
-            self.optimizer.step()
-
-    def evaluate(self, test_scores):
-        print("\n>>> [Step 4] 评估测试集...")
-        self.model.eval()
-        results = []
-        
-        cnt_detected = 0
-        cnt_kb_covered = 0
-        cnt_path_match = 0
-        cnt_root_match = 0
-        cnt_new = 0
-        
-        env = CausalTracingEnv(self.causal_graph, test_scores, self.threshold_df, self.expert_knowledge)
-        
-        for win_idx in range(len(test_scores)):
-            scores = test_scores[win_idx]
-            active = []
-            for t_name in config.TRIGGER_SENSORS:
-                if t_name in self.sensor_map:
-                    idx = self.sensor_map[t_name]
-                    if scores[idx] > config.TRIGGER_SCORE_THRESH:
-                        active.append((t_name, idx))
-            
-            for t_name, t_idx in active:
-                cnt_detected += 1
-                state_data = env.reset(force_window_idx=win_idx, force_trigger=t_name)
-                path_idxs = [t_idx]
-                done = False
-                
-                while not done:
-                    s_int = state_data[0].unsqueeze(0)
-                    s_float = state_data[1].unsqueeze(0)
-                    valid = env.get_valid_actions(path_idxs[-1])
-                    if len(valid) == 0: break
-                    
-                    logits, _ = self.model(s_int, s_float)
-                    mask = torch.full_like(logits, -1e9)
-                    mask[0, valid] = 0
-                    act = torch.argmax(logits + mask, dim=1).item()
-                    state_data, _, done, _ = env.step(act)
-                    path_idxs.append(act)
-                    if len(path_idxs) >= config.MAX_PATH_LENGTH: done = True
-                
-                path_names = [self.idx_to_sensor[i] for i in path_idxs]
-                root = path_names[-1]
-                root_score = scores[self.sensor_map[root]]
-                
-                match_status = "未定义"
-                logic = ""
-                
-                if t_idx in self.expert_knowledge:
-                    cnt_kb_covered += 1
-                    entry = self.expert_knowledge[t_idx]
-                    logic = entry.get('logic', '')
-                    
-                    real_roots = [self.idx_to_sensor[r] for r in entry['roots']]
-                    rm = False
-                    for p_node in path_names:
-                        if p_node in real_roots:
-                            rm = True
-                            break
-                    
-                    pm = False
-                    path_set = set(path_idxs)
-                    for exp_p in entry['paths']:
-                        exp_set = set(exp_p)
-                        intersection = len(path_set.intersection(exp_set))
-                        union = len(path_set.union(exp_set))
-                        if union > 0 and (intersection / union) >= 0.6:
-                            pm = True
-                            break
-                    
-                    if pm:
-                        match_status = "路径吻合"
-                        cnt_path_match += 1
-                        cnt_root_match += 1
-                    elif rm:
-                        match_status = "仅根因吻合"
-                        cnt_root_match += 1
-                    else:
-                        match_status = "不吻合"
-                else:
-                    match_status = "新链路"
-                    cnt_new += 1
-                
-                results.append({
-                    "窗口ID": win_idx,
-                    "诱发变量": t_name,
-                    "溯源路径": "->".join(path_names),
-                    "根因变量": root,
-                    "根因异常分": f"{root_score:.3f}",
-                    "是否知识库": "是" if t_idx in self.expert_knowledge else "否",
-                    "匹配情况": match_status,
-                    "机理描述": logic
-                })
-
-        denom = max(cnt_kb_covered, 1)
-        summary = [
-            {"指标": "检测到的总异常样本数", "数值": cnt_detected},
-            {"指标": "知识库覆盖的样本数", "数值": cnt_kb_covered},
-            {"指标": "异常链路准确率", "数值": f"{cnt_path_match/denom:.2%}"},
-            {"指标": "根因准确率", "数值": f"{cnt_root_match/denom:.2%}"},
-            {"指标": "新发现异常模式数", "数值": cnt_new}
-        ]
-        
-        save_path = os.path.join(config.RESULT_SAVE_DIR, config.TEST_RESULT_FILENAME)
-        with pd.ExcelWriter(save_path, engine='openpyxl') as writer:
-            pd.DataFrame(summary).to_excel(writer, sheet_name='Sheet1_概览指标', index=False)
-            pd.DataFrame(results).to_excel(writer, sheet_name='Sheet2_测试集详情', index=False)
-            
-        print("\n" + "="*50)
-        print(pd.DataFrame(summary).to_string(index=False))
-        print(f"\n文件已保存: {save_path}")
-        print("="*50)
-
-    def save_model(self):
-        path = os.path.join(config.MODEL_SAVE_DIR, "ppo_tracing_model.pth")
-        torch.save(self.model.state_dict(), path)

+ 0 - 291
models/Dynamic_anomaly_diagnosis/test.py

@@ -1,291 +0,0 @@
-# -*- coding: utf-8 -*-
-"""test.py: 在线诊断接口"""
-import os
-import pandas as pd
-import numpy as np
-import torch
-from config import config
-from data_processing import DataAnomalyProcessor
-from causal_structure import CausalStructureBuilder
-from rl_tracing import RLTrainer, CausalTracingEnv
-
-class WaterPlantDiagnoser:
-    def __init__(self):
-        
-        # 1. 初始化数据处理器 (用于加载阈值和计算得分,异常表征逻辑与训练完全一致)
-        self.processor = DataAnomalyProcessor()
-        self.sensor_list = self.processor.sensor_list
-        self.threshold_df = self.processor.threshold_df
-        
-        # 2. 构建因果图 (Layer 2)
-        self.builder = CausalStructureBuilder(self.threshold_df)
-        self.causal_graph = self.builder.build()
-        
-        # 3. 加载强化学习模型 (Layer 3)
-        dummy_scores = np.zeros((1, len(self.sensor_list)), dtype=np.float32)
-        self.trainer = RLTrainer(self.causal_graph, dummy_scores, self.threshold_df)
-        
-        model_path = os.path.join(config.MODEL_SAVE_DIR, "ppo_tracing_model.pth")
-        if not os.path.exists(model_path):
-            print(f"[Warning] 未找到模型文件: {model_path}。如果是测试环境,请确保已有预训练模型。")
-        else:
-            state_dict = torch.load(model_path, map_location=torch.device('cpu'), weights_only=True)
-            self.trainer.model.load_state_dict(state_dict)
-        
-        self.trainer.model.eval()
-        
-    def _preprocess_dataframe(self, df_input):
-        """
-        严格复现数据处理逻辑
-        """
-        try:
-            df = df_input.copy()
-            # 1. 时间解析
-            time_col = df.columns[0]
-            df[time_col] = pd.to_datetime(df[time_col], format='mixed', errors='coerce')
-            df = df.dropna(subset=[time_col]).set_index(time_col)
-            
-            # 2. 筛选列
-            valid_cols = [c for c in df.columns if c in self.sensor_list]
-            if not valid_cols: return None
-            df_valid = df[valid_cols]
-            
-            # 3. 降采样 (4s -> 20s)
-            df_resampled = df_valid.resample(f"{config.TARGET_SAMPLE_INTERVAL}s").mean()
-            df_resampled = df_resampled.astype(np.float32)
-            
-            return df_resampled
-        except Exception as e:
-            print(f"数据预处理错误: {e}")
-            return None
-
-    def api_predict(self, df_raw):
-        """
-        对外接口: 每次输入过去2小时的数据,取最新的40分钟进行诊断
-        """
-        # --- Layer 1: 数据预处理 ---
-        df_resampled = self._preprocess_dataframe(df_raw)
-        if df_resampled is None or df_resampled.empty:
-            return {"status": "error", "msg": "数据预处理失败或数据为空"}
-            
-        # --- Layer 1: 异常得分计算 (包含绝对+MAD综合得分) ---
-        scores_dict = {}
-        for sensor in self.sensor_list:
-            if sensor in df_resampled.columns:
-                scores_dict[sensor] = self.processor._calculate_point_score_vectorized(
-                    df_resampled[sensor], sensor
-                )
-            else:
-                # 严格对齐: 使用 np.nan 保持与训练完全一致
-                scores_dict[sensor] = pd.Series(np.nan, index=df_resampled.index, dtype=np.float32)
-        
-        point_score_df = pd.DataFrame(scores_dict)[self.sensor_list]
-        
-        # --- Layer 1: 提取最新40分钟进行窗口聚合与原始数据均值计算 ---
-        req_points = config.POINTS_PER_WINDOW  # 40分钟 = 120个点 (20s间隔)
-        total_points = len(point_score_df)
-        
-        if total_points < req_points:
-            return {
-                "status": "warning", 
-                "message": f"传入数据不足40分钟 (当前: {total_points*20}s, 需要: {req_points*20}s)"
-            }
-            
-        # 切片:取最后40分钟的数据 (用于计算异常分) 和 原始数据 (用于计算偏差百分比)
-        latest_40min_scores = point_score_df.iloc[-req_points:].values
-        latest_40min_raw = df_resampled.iloc[-req_points:]
-        # 计算这40分钟内原始传感器的均值,作为业务展示的参考值
-        real_time_values = latest_40min_raw.mean(skipna=True)
-        
-        # 计算有效比例和 95 分位数
-        valid_counts = np.sum(~np.isnan(latest_40min_scores), axis=0)
-        valid_ratios = valid_counts / req_points
-        
-        current_window_scores = np.zeros(latest_40min_scores.shape[1], dtype=np.float32)
-        valid_mask = valid_ratios >= config.VALID_DATA_RATIO
-        
-        if np.any(valid_mask):
-            current_window_scores[valid_mask] = np.nanquantile(latest_40min_scores[:, valid_mask], 0.95, axis=0)
-        
-        current_window_scores = np.clip(current_window_scores, 0, 1)
-        
-        # --- Layer 2 & 3: 触发检测与溯源 ---
-        results = []
-        active_triggers = []
-        
-        for t_name in config.TRIGGER_SENSORS:
-            if t_name in self.causal_graph['sensor_to_idx']:
-                idx = self.causal_graph['sensor_to_idx'][t_name]
-                score = current_window_scores[idx]
-                if score > config.TRIGGER_SCORE_THRESH:
-                    active_triggers.append((t_name, idx, score))
-
-        if not active_triggers:
-            return {
-                "status": "normal", 
-                "message": "系统运行正常", 
-            }
-
-        # 构建临时环境进行溯源
-        env_scores = current_window_scores.reshape(1, -1)
-        temp_env = CausalTracingEnv(self.causal_graph, env_scores, self.threshold_df, self.trainer.expert_knowledge)
-        
-        for t_name, t_idx, t_score in active_triggers:
-            state_data = temp_env.reset(force_window_idx=0, force_trigger=t_name)
-            path_idxs = [t_idx]
-            done = False
-            
-            while not done:
-                s_int = state_data[0].unsqueeze(0)
-                s_float = state_data[1].unsqueeze(0)
-                valid = temp_env.get_valid_actions(path_idxs[-1])
-                
-                if len(valid) == 0: break
-                
-                logits, _ = self.trainer.model(s_int, s_float)
-                mask = torch.full_like(logits, -1e9)
-                mask[0, valid] = 0
-                act = torch.argmax(logits + mask, dim=1).item()
-                
-                state_data, _, done, _ = temp_env.step(act)
-                path_idxs.append(act)
-                if len(path_idxs) >= config.MAX_PATH_LENGTH: done = True
-            
-            path_names = [self.trainer.idx_to_sensor[i] for i in path_idxs]
-            
-            # --- 1 & 2:计算链路有效性与双重偏差量化 ---
-            path_details = []
-            abnormal_other_nodes_count = 0
-            
-            for node_idx, node_name in zip(path_idxs, path_names):
-                node_score = current_window_scores[node_idx]
-                is_node_abnormal = node_score > config.WINDOW_ANOMALY_THRESHOLD
-                
-                # 若不是诱发变量本身且达到异常标准,计数+1
-                if node_idx != t_idx and is_node_abnormal:
-                    abnormal_other_nodes_count += 1
-                
-                # 1. 获取当前参考值
-                raw_val = real_time_values.get(node_name, np.nan)
-                
-                # 2. 实时回溯计算该节点的动态基线与上下限
-                dyn_lower, dyn_upper, dyn_med = np.nan, np.nan, np.nan
-                if node_name in df_resampled.columns:
-                    vals = df_resampled[node_name].astype(np.float32)
-                    # 模拟滑动窗口还原 MAD 的历史状态
-                    r_med = vals.rolling(window=config.MAD_HISTORY_WINDOW, min_periods=1).median()
-                    r_mad = (vals - r_med).abs().rolling(window=config.MAD_HISTORY_WINDOW, min_periods=1).median()
-                    
-                    # 提取时刻末尾的值作为基线
-                    last_med = r_med.iloc[-1]
-                    last_mad = r_mad.iloc[-1]
-                    dyn_lower = last_med - config.MAD_THRESHOLD * last_mad
-                    dyn_upper = last_med + config.MAD_THRESHOLD * last_mad
-                    dyn_med = last_med
-
-                # 3. 分别构建【物理范围】与【动态范围】的显示文字
-                phy_str = "未定义"
-                dyn_str = "未定义"
-                node_name_zh = "未知"
-                
-                # 提取中文名称
-                if node_name in self.processor.threshold_dict:
-                    info = self.processor.threshold_dict[node_name]
-                    node_name_zh = info.get('Name', '未知')
-                
-                if not pd.isna(raw_val) and node_name in self.processor.threshold_dict:
-                    info = self.processor.threshold_dict[node_name]
-                    g_min, g_max = info['Good_min'], info['Good_max']
-                    
-                    # -- 物理范围判定 --
-                    if raw_val > g_max and g_max != np.inf:
-                        pct = (raw_val - g_max) / (abs(g_max) + 1e-5) * 100
-                        phy_str = f"偏高 {pct:.1f}% (物理允许上限: {g_max})"
-                    elif raw_val < g_min and g_min != -np.inf:
-                        pct = (g_min - raw_val) / (abs(g_min) + 1e-5) * 100
-                        phy_str = f"偏低 {pct:.1f}% (物理允许下限: {g_min})"
-                    else:
-                        phy_str = f"正常 (物理范围: [{g_min}, {g_max}])"
-                        
-                    # -- 动态范围判定 --
-                    if not pd.isna(dyn_lower) and not pd.isna(dyn_upper):
-                        if raw_val > dyn_upper:
-                            dyn_str = f"异常突增 (近期基线: {dyn_med:.2f}, 动态上限: {dyn_upper:.2f})"
-                        elif raw_val < dyn_lower:
-                            dyn_str = f"异常突降 (近期基线: {dyn_med:.2f}, 动态下限: {dyn_lower:.2f})"
-                        else:
-                            dyn_str = f"平稳波动 (近期基线: {dyn_med:.2f}, 动态区间: [{dyn_lower:.2f}, {dyn_upper:.2f}])"
-
-                # 组装为清晰的结构化文本
-                dev_str = f"当前值: {raw_val:.2f} | 物理工况: {phy_str} | 动态趋势: {dyn_str}"
-
-                path_details.append({
-                    "node": node_name,
-                    "name": node_name_zh, 
-                    "anomaly_score": round(float(node_score), 4),
-                    "is_abnormal": bool(is_node_abnormal),
-                    "deviation": dev_str
-                })
-            
-            #  除诱发变量外,只要有 >= 1 个节点异常即触发溯源报警
-            if abnormal_other_nodes_count >= 1:
-                results.append({
-                    "trigger": t_name,
-                    "path": " -> ".join(path_names),
-                    "root_cause": path_names[-1],
-                    "details": path_details
-                })
-            
-        # 若所有触发链路均不满足 >= 1 个额外异常节点的条件
-        if not results:
-            return {
-                "status": "normal",
-                "message": "系统运行正常",
-            }
-            
-        return {
-            "status": "abnormal",
-            "results": results
-        }
-
-
-if __name__ == "__main__":
-    
-    # 模拟外部调用机制:每次固定传 2 小时的数据
-    test_file = os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}1.csv")
-    
-    if os.path.exists(test_file):
-        print(">>> 正在启动在线诊断引擎测试 (模式:读2小时,查末尾40分钟)...")
-        diagnoser = WaterPlantDiagnoser()
-        
-        # 假设原始数据采样率为 4 秒一次
-        # 2小时 = 7200秒 = 1800 行原始数据
-        CHUNK_SIZE = 1800 
-        
-        df_full = pd.read_csv(test_file, low_memory=False)
-        
-        if len(df_full) >= CHUNK_SIZE * 3:
-            import json
-            
-            # 模拟 1:传入 0~2 小时的数据
-            df_sim1 = df_full.iloc[0 : CHUNK_SIZE]
-            print("\n[测试 1] 传入 0~2 小时数据:")
-            print(json.dumps(diagnoser.api_predict(df_sim1), ensure_ascii=False, indent=2))
-            
-            # 模拟 2:传入 2~4 小时的数据
-            df_sim2 = df_full.iloc[CHUNK_SIZE : CHUNK_SIZE * 2]
-            print("\n[测试 2] 传入 2~4 小时数据:")
-            print(json.dumps(diagnoser.api_predict(df_sim2), ensure_ascii=False, indent=2))
-            
-            # 模拟 3:传入末尾 2 小时的数据
-            df_sim3 = df_full.iloc[-CHUNK_SIZE:]
-            print("\n[测试 3] 传入 4~6 小时数据:")
-            print(json.dumps(diagnoser.api_predict(df_sim3), ensure_ascii=False, indent=2))
-            
-        else:
-            import json
-            print("\n数据量不足以做三次两小时模拟,直接进行单次全量模拟:")
-            print(json.dumps(diagnoser.api_predict(df_full), ensure_ascii=False, indent=2))
-            
-    else:
-        print(f"测试文件 {test_file} 不存在,无法执行本地测试。")