瀏覽代碼

1.提交动态异常诊断模型

zhanghao 1 月之前
父節點
當前提交
fde46f97e1

+ 54 - 0
models/Dynamic_anomaly_diagnosis/causal_structure.py

@@ -0,0 +1,54 @@
+# -*- coding: utf-8 -*-
+"""causal_structure.py: 第二层 - 物理因果结构构建"""
+import numpy as np
+import pandas as pd
+from config import config
+
+class CausalStructureBuilder:
+    def __init__(self, threshold_df):
+        self.df = threshold_df
+        self.sensor_list = self.df['ID'].tolist()
+        self.id_to_idx = {name: i for i, name in enumerate(self.sensor_list)}
+        self.num_sensors = len(self.sensor_list)
+        self.col_layer = self._find_col_by_keyword(config.KEYWORD_LAYER)
+        self.col_device = self._find_col_by_keyword(config.KEYWORD_DEVICE)
+
+    def _find_col_by_keyword(self, keyword):
+        if keyword in self.df.columns: return keyword
+        for col in self.df.columns:
+            if col.lower() == keyword.lower(): return col
+        raise ValueError(f"错误: 未找到列名包含 '{keyword}' 的列")
+
+    def build(self):
+        adj_matrix = np.zeros((self.num_sensors, self.num_sensors), dtype=int)
+        nodes_info = {}
+        for _, row in self.df.iterrows():
+            d_val = row[self.col_device]
+            dev_id = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None
+            try: l_val = int(row[self.col_layer])
+            except: l_val = -1
+            nodes_info[row['ID']] = {'layer': l_val, 'device': dev_id}
+            
+        count_edges = 0
+        for i, src_name in enumerate(self.sensor_list):
+            src_node = nodes_info.get(src_name)
+            if not src_node or src_node['layer'] == -1: continue
+            src_l, src_d = src_node['layer'], src_node['device']
+            
+            for j, dst_name in enumerate(self.sensor_list):
+                if i == j: continue 
+                dst_node = nodes_info.get(dst_name)
+                if not dst_node or dst_node['layer'] == -1: continue
+                dst_l, dst_d = dst_node['layer'], dst_node['device']
+                
+                is_layer_valid = (dst_l == src_l) or (dst_l == src_l - 1)
+                if not is_layer_valid: continue
+                    
+                is_dev_valid = True
+                if (src_d is not None) and (dst_d is not None):
+                    if src_d != dst_d: is_dev_valid = False
+                
+                if is_dev_valid:
+                    adj_matrix[i, j] = 1
+                    count_edges += 1
+        return {"sensor_list": self.sensor_list, "sensor_to_idx": self.id_to_idx, "adj_matrix": adj_matrix}

+ 98 - 0
models/Dynamic_anomaly_diagnosis/config.py

@@ -0,0 +1,98 @@
+# -*- coding: utf-8 -*-
+"""config.py: 参数文件"""
+import os
+
+class Config:
+    # ---------------- 路径配置 ----------------
+    # 项目根目录
+    BASE_DIR = os.path.dirname(os.path.abspath(__file__))
+    # 传感器时序数据文件存放目录
+    DATASET_SENSOR_DIR = os.path.join(BASE_DIR, "datasets_xishan")
+    # 训练好的模型权重保存目录
+    MODEL_SAVE_DIR = os.path.join(BASE_DIR, "models")
+    # 最终结果报表保存目录
+    RESULT_SAVE_DIR = os.path.join(BASE_DIR, "results")
+    
+    # 阈值配置文件名 (包含传感器阈值、层级One_layer、设备Device等元数据)
+    THRESHOLD_FILENAME = "sensor_threshold.xlsx"
+    # 专家知识库文件名 (包含已知的历史异常链路)
+    ABNORMAL_LINK_FILENAME = "abnormal_link.xlsx"
+    # 传感器数据文件的命名前缀 (如 data_process_1.csv)
+    SENSOR_FILE_PREFIX = "data_process_"
+    # 最终生成的测试评估报告文件名
+    TEST_RESULT_FILENAME = "Final_Test_Report.xlsx" 
+    
+    # Excel中用于识别关键列的关键字
+    KEYWORD_LAYER = 'One_layer' # 用于构建因果图层级的列名关键字
+    KEYWORD_DEVICE = 'Device'   # 用于设备一致性约束的列名关键字
+    
+    # ---------------- 数据处理参数 ----------------
+    # 要读取的文件编号范围 (例如读取 data_process_1 到 data_process_119)
+    SENSOR_FILE_NUM_RANGE = (1, 119)
+    # 原始数据的采样间隔 (秒)
+    ORIGINAL_SAMPLE_INTERVAL = 4
+    # 降采样后的目标间隔 (秒),用于减少数据量加速计算
+    TARGET_SAMPLE_INTERVAL = 20
+    # 一个检测窗口的时间长度 (分钟)
+    WINDOW_DURATION_MIN = 40
+    # 每个窗口包含的数据点数 = (40分钟 * 60秒) / 20秒 = 120点
+    POINTS_PER_WINDOW = int((WINDOW_DURATION_MIN * 60) / TARGET_SAMPLE_INTERVAL)
+    # 滑动窗口的步长 (点数),设为窗口的一半以增加样本覆盖率
+    WINDOW_STEP = POINTS_PER_WINDOW // 2
+    # 窗口有效性阈值:窗口内非空数据比例需超过此值(60%)才会被处理,否则视为无效窗口
+    VALID_DATA_RATIO = 0.6
+    # 窗口异常判定阈值 (用于判断根因节点的异常程度是否足够高)
+    WINDOW_ANOMALY_THRESHOLD = 0.2
+    # 训练集与测试集的划分比例 (0.8 表示前80%的时间段用于训练,后20%用于测试)
+    TRAIN_TEST_SPLIT = 0.8
+    # 诱发变量的触发阈值
+    TRIGGER_SCORE_THRESH = 0.5
+    
+    # ---------------- 异常量化得分权重与动态MAD参数 ----------------
+    # 绝对阈值异常得分权重
+    ABSOLUTE_SCORE_WEIGHT = 0.6
+    # 动态MAD异常得分权重
+    DYNAMIC_SCORE_WEIGHT = 0.4
+    
+    # 动态MAD滚动窗口大小 (360 = 2小时)
+    MAD_HISTORY_WINDOW = 360
+    # 动态MAD判定阈值 
+    MAD_THRESHOLD = 3.0
+    
+    # ---------------- 诱发变量列表 ----------------
+    # 定义哪些传感器是“诱发变量” 
+    TRIGGER_SENSORS = [
+        "UF1Per", "UF2Per", "UF3Per", "UF4Per",
+        "C.M.RO1_DB@DPT_1", "C.M.RO2_DB@DPT_1", "C.M.RO3_DB@DPT_1", "C.M.RO4_DB@DPT_1",
+        "C.M.RO1_DB@DPT_2", "C.M.RO2_DB@DPT_2", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_2",
+        "RO1_CSFlow", "RO2_CSFlow", "RO3_CSFlow", "RO4_CSFlow",
+        "RO1HSL", "RO2HSL", "RO3HSL", "RO4HSL",
+        "RO1_TYL", "RO2_TYL", "RO3_TYL", "RO4_TYL"
+    ]
+
+    # ---------------- 强化学习核心参数 ----------------
+    # 生成的异常链路最小长度限制 (防止路径过短)
+    MIN_PATH_LENGTH = 3
+    # 生成的异常链路最大长度限制 (防止路径过长发散)
+    MAX_PATH_LENGTH = 6
+    
+    # 网络结构参数
+    EMBEDDING_DIM = 64  # 节点的嵌入向量维度
+    HIDDEN_DIM = 256    # 隐藏层维度
+    
+    # PPO (Proximal Policy Optimization) 算法超参数
+    PPO_LR = 3e-4             # 学习率
+    PPO_GAMMA = 0.90          # 折扣因子
+    PPO_EPS_CLIP = 0.2        # PPO更新时的截断范围,防止策略更新幅度过大
+    PPO_K_EPOCHS = 10         # 每次数据采集后,网络更新的循环次数
+    PPO_BATCH_SIZE = 64       # 训练批次大小
+    
+    # 训练轮次设置
+    BC_EPOCHS = 50000         # 行为克隆 (Behavior Cloning) 轮次
+    RL_EPISODES = 5000        # 强化学习 (PPO) 轮次
+    
+    # 自动创建所需的目录结构
+    for d in [MODEL_SAVE_DIR, DATASET_SENSOR_DIR, RESULT_SAVE_DIR]:
+        os.makedirs(d, exist_ok=True)
+
+config = Config()

+ 269 - 0
models/Dynamic_anomaly_diagnosis/data_processing.py

@@ -0,0 +1,269 @@
+# -*- coding: utf-8 -*-
+"""data_processing.py: 第一层 - 异常量化表征 """
+import pandas as pd
+import numpy as np
+import os
+from tqdm import tqdm
+from config import config
+import gc
+from joblib import Parallel, delayed
+import multiprocessing
+
+def _process_single_file_task(file_idx, file_path, sensor_list, target_interval):
+    """
+    单个文件的读取与降采样任务(运行在子进程中)
+    """
+    if not os.path.exists(file_path):
+        return None
+    
+    try:
+        # 1. 快速读取
+        df_temp = pd.read_csv(file_path, index_col=None, low_memory=False)
+        if df_temp.empty:
+            return None
+            
+        # 2. 时间解析 (mixed格式)
+        time_col = df_temp.columns[0]
+        df_temp[time_col] = pd.to_datetime(
+            df_temp[time_col], 
+            format='mixed', 
+            errors='coerce'
+        )
+        df_temp = df_temp.dropna(subset=[time_col]).set_index(time_col)
+        
+        # 3. 筛选列
+        valid_cols = [c for c in df_temp.columns if c in sensor_list]
+        if not valid_cols:
+            return None
+            
+        df_valid = df_temp[valid_cols]
+        
+        # 4. 立即降采样 (4s -> 20s) & 类型转换
+        df_resampled = df_valid.resample(f"{target_interval}s").mean()
+        df_resampled = df_resampled.astype(np.float32)
+        
+        return df_resampled
+        
+    except Exception as e:
+        print(f"文件 {file_idx} 处理出错: {e}")
+        return None
+
+def _calculate_window_chunk(start_indices, values, win_len, valid_ratio, threshold_val=0.95):
+    """
+    窗口计算任务块(运行在子进程中)
+    处理一批窗口的 nanquantile 计算
+    """
+    chunk_results = []
+    
+    for start in start_indices:
+        win_data = values[start : start + win_len, :]
+        
+        # 向量化计算有效性
+        # axis=0 沿时间轴统计
+        valid_counts = np.sum(~np.isnan(win_data), axis=0)
+        valid_ratios = valid_counts / win_len
+        
+        win_res = np.zeros(win_data.shape[1], dtype=np.float32)
+        valid_mask = valid_ratios >= valid_ratio
+        
+        if np.any(valid_mask):
+            # 并行化
+            quantile_scores = np.nanquantile(win_data[:, valid_mask], threshold_val, axis=0)
+            win_res[valid_mask] = quantile_scores
+            
+        chunk_results.append(np.clip(win_res, 0, 1))
+        
+    return chunk_results
+
+class DataAnomalyProcessor:
+    def __init__(self):
+        self.threshold_path = os.path.join(config.BASE_DIR, config.THRESHOLD_FILENAME)
+        self.threshold_df = self._load_thresholds()
+        self.sensor_list = self.threshold_df['ID'].tolist()
+        self.threshold_dict = self.threshold_df.set_index('ID').to_dict('index')
+        
+        # 根据CPU核心数设定并行度
+        self.n_jobs_io = 24  
+        self.n_jobs_cpu = 64 
+        
+    def _load_thresholds(self):
+        """加载阈值表"""
+        try:
+            df = pd.read_excel(self.threshold_path)
+        except Exception as e:
+            raise ValueError(f"读取阈值表失败: {e}")
+
+        required_cols = ['ID', 'Direction', 'Hard_min', 'Hard_max', 'Good_min', 'Good_max']
+        for col in required_cols:
+            if col not in df.columns:
+                for c in df.columns:
+                    if c.lower() == col.lower():
+                        df.rename(columns={c: col}, inplace=True)
+                        break
+        
+        cols_to_numeric = ['Hard_min', 'Hard_max', 'Good_min', 'Good_max', 'Alarm_time']
+        for c in cols_to_numeric:
+            if c in df.columns:
+                df[c] = pd.to_numeric(df[c], errors='coerce')
+        
+        df['Hard_min'] = df['Hard_min'].fillna(-np.inf)
+        df['Hard_max'] = df['Hard_max'].fillna(np.inf)
+        df['Good_min'] = df['Good_min'].fillna(-np.inf)
+        df['Good_max'] = df['Good_max'].fillna(np.inf)
+        df['Alarm_time'] = df['Alarm_time'].fillna(60)
+        return df
+
+    def _calculate_point_score_vectorized(self, series, sensor_name):
+        # 向量化计算逻辑
+        if sensor_name not in self.threshold_dict:
+            return pd.Series(0.0, index=series.index, dtype=np.float32)
+        
+        info = self.threshold_dict[sensor_name]
+        vals = series.astype(np.float32).copy()
+        
+        # 1. 硬阈值掩码(在硬阈值外的数据视为无效/缺失)
+        mask_invalid = (vals < info['Hard_min']) | (vals > info['Hard_max'])
+        vals[mask_invalid] = np.nan
+        
+        # 如果全部为 NaN,直接返回 0
+        if vals.isna().all(): 
+            return pd.Series(0.0, index=vals.index, dtype=np.float32)
+        
+        # ==================== (A) 计算绝对阈值得分 ====================
+        abs_scores = pd.Series(0.0, index=vals.index, dtype=np.float32)
+        direction = str(info['Direction']).strip().lower()
+        
+        if direction in ['low', 'both']:
+            mask_low = (vals < info['Good_min']) & (vals >= info['Hard_min'])
+            denom = info['Good_min'] - info['Hard_min']
+            if denom > 1e-5:
+                abs_scores[mask_low] = (info['Good_min'] - vals[mask_low]) / denom
+            else:
+                abs_scores[mask_low] = 1.0
+                
+        if direction in ['high', 'both']:
+            mask_high = (vals > info['Good_max']) & (vals <= info['Hard_max'])
+            denom = info['Hard_max'] - info['Good_max']
+            if denom > 1e-5:
+                abs_scores[mask_high] = (vals[mask_high] - info['Good_max']) / denom
+            else:
+                abs_scores[mask_high] = 1.0
+
+        alarm_t = max(info['Alarm_time'], 1.0)
+        time_weight = 1.0 + (30.0 / alarm_t) 
+        abs_scores = (abs_scores * time_weight).clip(0, 1)
+        
+        # ==================== (B) 计算动态 MAD 得分 ====================
+        # 使用 rolling 计算滚动窗口内的中位数 (Median)
+        rolling_median = vals.rolling(
+            window=config.MAD_HISTORY_WINDOW, 
+            min_periods=1
+        ).median()
+        
+        # 计算绝对偏差 |x_i - Median|
+        abs_deviation = (vals - rolling_median).abs()
+        
+        # 计算 MAD = Median(|x_i - Median|)
+        rolling_mad = abs_deviation.rolling(
+            window=config.MAD_HISTORY_WINDOW, 
+            min_periods=1
+        ).median()
+        
+        # 计算动态得分:将偏差映射到 0~1 的区间,除以 (MAD_THRESHOLD * MAD)
+        # 加 1e-5 防止除以 0 导致溢出
+        dyn_scores = (abs_deviation / (rolling_mad * config.MAD_THRESHOLD + 1e-5)).clip(0, 1)
+        
+        # ==================== (C) 综合加权得分 ====================
+        final_scores = (config.ABSOLUTE_SCORE_WEIGHT * abs_scores) + (config.DYNAMIC_SCORE_WEIGHT * dyn_scores)
+        
+        # 恢复无效数据的 NaN 状态
+        final_scores[mask_invalid] = np.nan 
+        
+        return final_scores
+
+    def process(self):
+        print(f">>> [Step 1] 数据处理启动 | 检测到 CPU 核心数: {multiprocessing.cpu_count()}")
+        
+        # 1. 并行读取与降采样
+        file_indices = list(range(config.SENSOR_FILE_NUM_RANGE[0], config.SENSOR_FILE_NUM_RANGE[1] + 1))
+        file_paths = [os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}{i}.csv") for i in file_indices]
+        
+        print(f"正在并行读取 {len(file_indices)} 个文件 (并发数: {self.n_jobs_io})...")
+        
+        # 使用 joblib 并行执行 _process_single_file_task
+        results = Parallel(n_jobs=self.n_jobs_io, backend="loky", verbose=5)(
+            delayed(_process_single_file_task)(
+                idx, path, self.sensor_list, config.TARGET_SAMPLE_INTERVAL
+            ) for idx, path in zip(file_indices, file_paths)
+        )
+        
+        # 过滤掉 None 结果
+        all_series_list = [r for r in results if r is not None]
+        
+        if not all_series_list:
+            raise ValueError("没有读取到有效数据")
+
+        # 2. 合并数据
+        print("正在纵向合并数据...")
+        resampled_df = pd.concat(all_series_list, axis=0).sort_index()
+        resampled_df = resampled_df[~resampled_df.index.duplicated(keep='first')]
+        
+        # 释放内存
+        del all_series_list, results
+        gc.collect()
+
+        # 3. 计算点级得分
+        print("计算点级异常得分...")
+        scores_dict = {}
+        for sensor in tqdm(self.sensor_list, desc="计算得分"):
+            if sensor in resampled_df.columns:
+                scores_dict[sensor] = self._calculate_point_score_vectorized(resampled_df[sensor], sensor)
+            else:
+                scores_dict[sensor] = pd.Series(np.nan, index=resampled_df.index, dtype=np.float32)
+        
+        point_score_df = pd.DataFrame(scores_dict)
+        point_score_df = point_score_df[self.sensor_list] # 对齐列顺序
+        
+        # 4. 并行窗口聚合 (计算密集型)
+        print(f"正在并行计算窗口分位数 (并发数: {self.n_jobs_cpu})...")
+        
+        values = point_score_df.values
+        win_len = config.POINTS_PER_WINDOW
+        step = config.WINDOW_STEP
+        total_len = len(values)
+        
+        if total_len < win_len:
+             return np.array([]), np.array([]), self.threshold_df
+        
+        # 生成所有窗口的起始索引
+        all_start_indices = list(range(0, total_len - win_len, step))
+        
+        # 将索引切分为多个 Chunk,每个 Chunk 分配给一个 CPU 核心
+        num_chunks = self.n_jobs_cpu * 4
+        chunk_size = max(1, len(all_start_indices) // num_chunks)
+        chunks = [all_start_indices[i:i + chunk_size] for i in range(0, len(all_start_indices), chunk_size)]
+        
+        # 并行计算
+        chunk_results = Parallel(n_jobs=self.n_jobs_cpu, backend="loky", verbose=5)(
+            delayed(_calculate_window_chunk)(
+                chunk, values, win_len, config.VALID_DATA_RATIO
+            ) for chunk in chunks
+        )
+        
+        # 合并结果
+        window_scores = [item for sublist in chunk_results for item in sublist]
+        
+        final_scores = np.nan_to_num(np.array(window_scores, dtype=np.float32), nan=0.0)
+        
+        # 5. 切分数据集
+        total_samples = final_scores.shape[0]
+        if total_samples == 0:
+            raise ValueError("窗口数量为0")
+            
+        split_idx = int(total_samples * config.TRAIN_TEST_SPLIT)
+        train_scores = final_scores[:split_idx]
+        test_scores = final_scores[split_idx:]
+        
+        print(f"处理完成: 训练集 {len(train_scores)} | 测试集 {len(test_scores)}")
+        
+        return train_scores, test_scores, self.threshold_df

+ 158 - 0
models/Dynamic_anomaly_diagnosis/input_format.txt

@@ -0,0 +1,158 @@
+index
+C.M.FT_ZJS@out
+C.M.LT_JSC@out
+UF_bump1_n
+UF_bump2_n
+UF_bump3_n
+UF_bump4_n
+C.M.UF_GSB1_fre@out
+C.M.UF_GSB2_fre@out
+C.M.UF_GSB3_fre@out
+C.M.UF_GSB4_fre@out
+C.M.UF_GSB1_A@out
+C.M.UF_GSB2_A@out
+C.M.UF_GSB3_A@out
+C.M.UF_GSB4_A@out
+UF_bump_avg
+water_in
+C.M.UF_PT_ZJS@out
+C.M.UF_Tur_ZJS@out
+C.M.RO_TT_ZJS@out
+PFTMP
+C.M.UF1_FT_JS@out
+C.M.UF2_FT_JS@out
+C.M.UF3_FT_JS@out
+C.M.UF4_FT_JS@out
+C.M.UF1_JSF_kd@out
+C.M.UF2_JSF_kd@out
+C.M.UF3_JSF_kd@out
+C.M.UF4_JSF_kd@out
+C.M.UF1_PT_JS@out
+C.M.UF2_PT_JS@out
+C.M.UF3_PT_JS@out
+C.M.UF4_PT_JS@out
+UF1_FluxF
+UF2_FluxF
+UF3_FluxF
+UF4_FluxF
+UF1Per
+UF2Per
+UF3Per
+UF4Per
+C.M.UF1_DB@press_PV
+C.M.UF2_DB@press_PV
+C.M.UF3_DB@press_PV
+C.M.UF4_DB@press_PV
+C.M.UF1_PT_CS@out
+C.M.UF2_PT_CS@out
+C.M.UF3_PT_CS@out
+C.M.UF4_PT_CS@out
+C.M.UF_PT_ZCS@out
+C.M.UF_FT_ZCS@out
+C.M.UF_Cl_ZCS@out
+C.M.UF_ORP_ZCS@out
+C.M.UF_PH_ZCS@out
+C.M.UF_Tur_ZCS@out
+RO_TotalFlow
+C.M.RO_Cond_ZJS@out
+C.M.RO_ORP_ZJS@out
+C.M.RO_PH_ZJS@out
+C.M.LT_HYJ1@out
+C.M.LT_HYJ2@out
+C.M.LT_ZGJ@out
+C.M.LT_SJJ@out
+RO1_GYB_n
+RO2_GYB_n
+RO3_GYB_n
+RO4_GYB_n
+C.M.RO1_GYB_fre@out
+C.M.RO2_GYB_fre@out
+C.M.RO3_GYB_fre@out
+C.M.RO4_GYB_fre@out
+C.M.RO1_GYB_A@out
+C.M.RO2_GYB_A@out
+C.M.RO3_GYB_A@out
+C.M.RO4_GYB_A@out
+C.M.RO1_GYBF_kd@out
+C.M.RO2_GYBF_kd@out
+C.M.RO3_GYBF_kd@out
+C.M.RO4_GYBF_kd@out
+C.M.RO1_FT_JS@out
+C.M.RO2_FT_JS@out
+C.M.RO3_FT_JS@out
+C.M.RO4_FT_JS@out
+C.M.RO1_PT_JS@out
+C.M.RO2_PT_JS@out
+C.M.RO3_PT_JS@out
+C.M.RO4_PT_JS@out
+C.M.RO1_DB@DPT_1
+C.M.RO2_DB@DPT_1
+C.M.RO3_DB@DPT_1
+C.M.RO4_DB@DPT_1
+C.M.RO1_PT_DJ1@out
+C.M.RO2_PT_DJ1@out
+C.M.RO3_PT_DJ1@out
+C.M.RO4_PT_DJ1@out
+RO1_DJB_n
+RO2_DJB_n
+RO3_DJB_n
+RO4_DJB_n
+C.M.RO1_DJB_fre@out
+C.M.RO2_DJB_fre@out
+C.M.RO3_DJB_fre@out
+C.M.RO4_DJB_fre@out
+C.M.RO1_DJB_A@out
+C.M.RO2_DJB_A@out
+C.M.RO3_DJB_A@out
+C.M.RO4_DJB_A@out
+C.M.RO1_PT_DJ2@out
+C.M.RO2_PT_DJ2@out
+C.M.RO3_PT_DJ2@out
+C.M.RO4_PT_DJ2@out
+C.M.RO1_DB@DPT_2
+C.M.RO2_DB@DPT_2
+C.M.RO3_DB@DPT_2
+C.M.RO4_DB@DPT_2
+C.M.RO1_PT_NS@out
+C.M.RO2_PT_NS@out
+C.M.RO3_PT_NS@out
+C.M.RO4_PT_NS@out
+C.M.RO1_FT_CS2@out
+C.M.RO2_FT_CS2@out
+C.M.RO3_FT_CS2@out
+C.M.RO4_FT_CS2@out
+RO1_SPK
+RO2_SPK
+RO3_SPK
+RO4_SPK
+C.M.RO1_FT_NS@out
+C.M.RO2_FT_NS@out
+C.M.RO3_FT_NS@out
+C.M.RO4_FT_NS@out
+RO1_CSFlow
+RO2_CSFlow
+RO3_CSFlow
+RO4_CSFlow
+RO1_FluxF
+RO2_FluxF
+RO3_FluxF
+RO4_FluxF
+C.M.RO1_PT_CS@out
+C.M.RO2_PT_CS@out
+C.M.RO3_PT_CS@out
+C.M.RO4_PT_CS@out
+C.M.RO1_Cond_CS@out
+C.M.RO2_Cond_CS@out
+C.M.RO3_Cond_CS@out
+C.M.RO4_Cond_CS@out
+RO1HSL
+RO2HSL
+RO3HSL
+RO4HSL
+RO1_TYL
+RO2_TYL
+RO3_TYL
+RO4_TYL
+C.M.RO_PT_ZCS@out
+RO_TCHFlow
+C.M.RO_Cond_ZCS@out

+ 32 - 0
models/Dynamic_anomaly_diagnosis/main.py

@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+"""main.py: 主运行文件"""
+from data_processing import DataAnomalyProcessor
+from causal_structure import CausalStructureBuilder
+from rl_tracing import RLTrainer
+
+def main():
+    
+    # 1. 数据层 (返回切分好的数据)
+    processor = DataAnomalyProcessor()
+    train_scores, test_scores, threshold_df = processor.process()
+    
+    # 2. 因果层
+    builder = CausalStructureBuilder(threshold_df)
+    causal_graph = builder.build()
+    
+    # 3. 强化学习层
+    # 初始化传入训练集
+    trainer = RLTrainer(causal_graph, train_scores, threshold_df)
+    
+    # 3.1 训练阶段
+    trainer.pretrain_bc()   # 学习已有的
+    trainer.train_ppo()     # 探索未知的
+    trainer.save_model()
+    
+    # 3.2 评估阶段 (使用测试集)
+    trainer.evaluate(test_scores)
+    
+    print("\n[Success] 所有任务执行完毕!")
+
+if __name__ == "__main__":
+    main()

二進制
models/Dynamic_anomaly_diagnosis/models/ppo_tracing_model.pth


+ 44 - 0
models/Dynamic_anomaly_diagnosis/output_format.txt

@@ -0,0 +1,44 @@
+输入按照input_format.txt文件中的变量输入(跟原来一样,不变),输入为过去2h的数据(1800条数据),40min的数据也能正常运行,但是动态范围计算可能存在问题
+输出分为三种情况:输入数据时间不足(至少大于40min);无异常;存在异常,给出异常的诱发变量,异常路径和根因变量
+{'status': 'warning', 'message': '数据时长不足: 6.6min < 40min'}
+{'status': 'normal', 'message': '系统运行正常'}
+{
+  "status": "abnormal",
+  "results": [
+    {
+      "trigger": "RO3_TYL",
+      "path": "RO3_TYL -> C.M.RO3_Cond_CS@out -> C.M.RO3_PT_NS@out -> C.M.RO3_PT_DJ1@out",
+      "root_cause": "C.M.RO3_PT_DJ1@out",
+      "details": [
+        {
+          "node": "RO3_TYL",
+          "name": "RO3脱盐率",
+          "anomaly_score": 0.5178,
+          "is_abnormal": true,
+          "deviation": "当前值: 94.78 | 物理工况: 偏低 2.8% (物理允许下限: 97.5) | 动态趋势: 平稳波动 (近期基线: 94.73, 动态区间: [94.59, 94.87])"
+        },
+        {
+          "node": "C.M.RO3_Cond_CS@out",
+          "name": "RO3产水电导",
+          "anomaly_score": 0.4,
+          "is_abnormal": true,
+          "deviation": "当前值: 106.80 | 物理工况: 正常 (物理范围: [10.0, 250.0]) | 动态趋势: 平稳波动 (近期基线: 107.43, 动态区间: [105.12, 109.73])"
+        },
+        {
+          "node": "C.M.RO3_PT_NS@out",
+          "name": "RO3二段浓水压力",
+          "anomaly_score": 0.4,
+          "is_abnormal": true,
+          "deviation": "当前值: 0.96 | 物理工况: 正常 (物理范围: [0.5, 1.05]) | 动态趋势: 平稳波动 (近期基线: 0.96, 动态区间: [0.96, 0.96])"
+        },
+        {
+          "node": "C.M.RO3_PT_DJ1@out",
+          "name": "RO3一段浓水压力",
+          "anomaly_score": 0.4,
+          "is_abnormal": true,
+          "deviation": "当前值: 0.90 | 物理工况: 正常 (物理范围: [0.02, 1.0]) | 动态趋势: 平稳波动 (近期基线: 0.89, 动态区间: [0.89, 0.90])"
+        }
+      ]
+    }
+  ]
+}

+ 443 - 0
models/Dynamic_anomaly_diagnosis/rl_tracing.py

@@ -0,0 +1,443 @@
+# -*- coding: utf-8 -*-
+"""rl_tracing.py: 强化学习链路级异常溯源"""
+import torch
+import torch.nn as nn
+import torch.optim as optim
+import torch.nn.functional as F
+from torch.distributions import Categorical
+import numpy as np
+import pandas as pd
+import os
+from tqdm import tqdm
+from config import config
+
+# ----------------- 1. 环境 -----------------
+class CausalTracingEnv:
+    def __init__(self, causal_graph, window_scores, threshold_df, expert_knowledge=None):
+        self.sensor_list = causal_graph['sensor_list']
+        self.map = causal_graph['sensor_to_idx']
+        self.idx_to_sensor = {v: k for k, v in self.map.items()}
+        self.adj = causal_graph['adj_matrix']
+        self.scores = window_scores
+        
+        self.expert_knowledge = expert_knowledge if expert_knowledge else {}
+        self.num_sensors = len(self.sensor_list)
+        
+        # 解析属性
+        self.node_props = {} 
+        col_one_layer = self._find_col(threshold_df, config.KEYWORD_LAYER)
+        col_device = self._find_col(threshold_df, config.KEYWORD_DEVICE)
+        
+        df_indexed = threshold_df.set_index('ID')
+        dict_one = df_indexed[col_one_layer].to_dict() if col_one_layer else {}
+        dict_dev = df_indexed[col_device].to_dict() if col_device else {}
+        
+        for name, idx in self.map.items():
+            l_val = dict_one.get(name, -1)
+            try: l_val = int(l_val)
+            except: l_val = 0 
+            d_val = dict_dev.get(name, None)
+            d_val = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None
+            self.node_props[idx] = {'one_layer': l_val, 'device': d_val}
+
+        self.current_window_idx = 0
+        self.current_node_idx = 0
+        self.prev_node_idx = 0
+        self.trigger_node_idx = 0
+        self.path = []
+        self.current_expert_paths = []
+        self.target_roots = set()
+        
+    def _find_col(self, df, keyword):
+        if keyword in df.columns: return keyword
+        for c in df.columns:
+            if c.lower() == keyword.lower(): return c
+        return None
+
+    def reset(self, force_window_idx=None, force_trigger=None):
+        if force_window_idx is not None:
+            self.current_window_idx = force_window_idx
+            t_name = force_trigger
+        else:
+            found = False
+            for _ in range(100):
+                w_idx = np.random.randint(len(self.scores))
+                win_scores = self.scores[w_idx]
+                candidates = []
+                for t_name in config.TRIGGER_SENSORS:
+                    if t_name in self.map:
+                        idx = self.map[t_name]
+                        if win_scores[idx] > config.TRIGGER_SCORE_THRESH:
+                            candidates.append(t_name)
+                if candidates:
+                    self.current_window_idx = w_idx
+                    t_name = np.random.choice(candidates)
+                    found = True
+                    break
+            if not found:
+                self.current_window_idx = np.random.randint(len(self.scores))
+                t_name = list(self.map.keys())[0]
+
+        self.current_node_idx = self.map.get(t_name, 0)
+        self.trigger_node_idx = self.current_node_idx
+        self.prev_node_idx = self.current_node_idx
+        self.path = [self.current_node_idx]
+        
+        self.target_roots = set()
+        self.current_expert_paths = []
+        if self.current_node_idx in self.expert_knowledge:
+            entry = self.expert_knowledge[self.current_node_idx]
+            self.target_roots = entry['roots']
+            self.current_expert_paths = entry['paths']
+            
+        return self._get_state()
+    
+    def _get_state(self):
+        curr_score = self.scores[self.current_window_idx, self.current_node_idx]
+        prev_score = self.scores[self.current_window_idx, self.prev_node_idx]
+        curr_layer = self.node_props[self.current_node_idx]['one_layer'] / 20.0
+        
+        return (
+            torch.LongTensor([self.current_node_idx, self.prev_node_idx, self.trigger_node_idx]), 
+            torch.FloatTensor([curr_score, prev_score, curr_layer])
+        )
+    
+    def get_valid_actions(self, curr_idx):
+        neighbors = np.where(self.adj[curr_idx] == 1)[0]
+        curr_props = self.node_props[curr_idx]
+        curr_l, curr_d = curr_props['one_layer'], curr_props['device']
+        valid = []
+        for n in neighbors:
+            if n in self.path: continue 
+            tgt_props = self.node_props[n]
+            tgt_l, tgt_d = tgt_props['one_layer'], tgt_props['device']
+            if curr_l != 0 and tgt_l != 0:
+                if not ((tgt_l == curr_l) or (tgt_l == curr_l - 1)): continue
+            if (curr_d is not None) and (tgt_d is not None):
+                if curr_d != tgt_d: continue
+            valid.append(n)
+        return np.array(valid)
+    
+    def step(self, action_idx):
+        prev = self.current_node_idx
+        self.prev_node_idx = prev
+        self.current_node_idx = action_idx
+        self.path.append(action_idx)
+        
+        score_curr = self.scores[self.current_window_idx, self.current_node_idx]
+        
+        reward = 0.0
+        done = False
+        
+        # 奖励机制 (Imitation > Root > Gradient)
+        in_expert_nodes = False
+        for e_path in self.current_expert_paths:
+            if action_idx in e_path:
+                in_expert_nodes = True
+                break
+        
+        if in_expert_nodes: reward += 2.0
+        else: reward -= 0.2
+            
+        if action_idx in self.target_roots:
+            reward += 10.0
+            done = True
+        
+        score_prev = self.scores[self.current_window_idx, prev]
+        diff = score_curr - score_prev
+        if diff > 0: reward += diff * 3.0
+        else: reward -= 0.5
+            
+        if len(self.path) >= config.MAX_PATH_LENGTH:
+            done = True
+            if action_idx not in self.target_roots: reward -= 5.0
+            
+        if score_curr < 0.15 and len(self.path) > 3:
+            done = True
+            reward -= 2.0
+            
+        return self._get_state(), reward, done, {}
+
+# ----------------- 2. 网络 -----------------
+class TargetDrivenActorCritic(nn.Module):
+    def __init__(self, num_sensors, embedding_dim=64, hidden_dim=256):
+        super().__init__()
+        self.node_emb = nn.Embedding(num_sensors, embedding_dim)
+        input_dim = (embedding_dim * 3) + 3 
+        
+        self.shared_net = nn.Sequential(
+            nn.Linear(input_dim, hidden_dim),
+            nn.ReLU(),
+            nn.LayerNorm(hidden_dim),
+            nn.Linear(hidden_dim, hidden_dim),
+            nn.ReLU()
+        )
+        self.actor = nn.Linear(hidden_dim, num_sensors)
+        self.critic = nn.Linear(hidden_dim, 1)
+        
+    def forward(self, int_data, float_data):
+        curr_emb = self.node_emb(int_data[:, 0])
+        prev_emb = self.node_emb(int_data[:, 1])
+        trig_emb = self.node_emb(int_data[:, 2]) 
+        x = torch.cat([curr_emb, prev_emb, trig_emb, float_data], dim=1)
+        feat = self.shared_net(x)
+        return self.actor(feat), self.critic(feat)
+
+# ----------------- 3. 训练器 -----------------
+class RLTrainer:
+    def __init__(self, causal_graph, train_scores, threshold_df):
+        self.sensor_map = causal_graph['sensor_to_idx']
+        self.idx_to_sensor = {v: k for k, v in self.sensor_map.items()}
+        self.threshold_df = threshold_df
+        self.causal_graph = causal_graph
+        self.expert_knowledge, self.bc_samples, _ = self._load_expert_data()
+        
+        self.env = CausalTracingEnv(causal_graph, train_scores, threshold_df, self.expert_knowledge)
+        self.model = TargetDrivenActorCritic(self.env.num_sensors, config.EMBEDDING_DIM, config.HIDDEN_DIM)
+        self.optimizer = optim.Adam(self.model.parameters(), lr=config.PPO_LR)
+        
+    def _load_expert_data(self):
+        path = os.path.join(config.BASE_DIR, config.ABNORMAL_LINK_FILENAME)
+        kb_data = {} 
+        bc_data = [] 
+        if not os.path.exists(path): return kb_data, bc_data, None
+        
+        df = pd.read_excel(path)
+        for _, row in df.iterrows():
+            link = str(row.get('Link Path', ''))
+            if not link: continue
+            nodes_str = [n.strip() for n in link.replace('→', '->').split('->')]
+            path_nodes = nodes_str[::-1]
+            
+            ids = []
+            valid = True
+            for n in path_nodes:
+                if n in self.sensor_map: ids.append(self.sensor_map[n])
+                else: valid = False; break
+            if not valid or len(ids)<2: continue
+            
+            trigger_id = ids[0]
+            root_id = ids[-1]
+            
+            if trigger_id not in kb_data:
+                kb_data[trigger_id] = {'paths': [], 'roots': set(), 'logic': row.get('Process Logic Basis', '')}
+            kb_data[trigger_id]['paths'].append(ids)
+            kb_data[trigger_id]['roots'].add(root_id)
+            
+            for i in range(len(ids) - 1):
+                curr = ids[i]
+                prev = ids[max(0, i-1)]
+                nxt = ids[i+1]
+                bc_data.append(((curr, prev, trigger_id), nxt))
+                
+        return kb_data, bc_data, df
+
+    def pretrain_bc(self):
+        if not self.bc_samples: return
+        print(f"\n>>> [Step 3.1] 启动BC预训练 ({config.BC_EPOCHS}轮)...")
+        states_int = torch.LongTensor([list(s) for s, a in self.bc_samples])
+        actions = torch.LongTensor([a for s, a in self.bc_samples])
+        states_float = torch.zeros((len(states_int), 3))
+        states_float[:, 0] = 0.9 
+        states_float[:, 1] = 0.8
+        
+        loss_fn = nn.CrossEntropyLoss()
+        pbar = tqdm(range(config.BC_EPOCHS), desc="BC Training")
+        for epoch in pbar:
+            logits, _ = self.model(states_int, states_float)
+            loss = loss_fn(logits, actions)
+            self.optimizer.zero_grad()
+            loss.backward()
+            self.optimizer.step()
+            if epoch%100==0: pbar.set_postfix({'Loss': f"{loss.item():.4f}"})
+
+    def train_ppo(self):
+        print(f"\n>>> [Step 3.2] 启动PPO训练 ({config.RL_EPISODES}轮)...")
+        pbar = tqdm(range(config.RL_EPISODES), desc="PPO Training")
+        rewards_hist = []
+        for _ in pbar:
+            state_data = self.env.reset()
+            done = False
+            ep_r = 0
+            b_int, b_float, b_act, b_lp, b_rew, b_mask = [], [], [], [], [], []
+            
+            while not done:
+                s_int = state_data[0].unsqueeze(0)
+                s_float = state_data[1].unsqueeze(0)
+                valid = self.env.get_valid_actions(s_int[0, 0].item())
+                if len(valid) == 0: break
+                
+                logits, _ = self.model(s_int, s_float)
+                mask = torch.full_like(logits, -1e9)
+                mask[0, valid] = 0
+                dist = Categorical(F.softmax(logits+mask, dim=-1))
+                action = dist.sample()
+                
+                next_s, r, done, _ = self.env.step(action.item())
+                b_int.append(s_int); b_float.append(s_float)
+                b_act.append(action); b_lp.append(dist.log_prob(action))
+                b_rew.append(r); b_mask.append(1-done)
+                state_data = next_s
+                ep_r += r
+            
+            if len(b_rew) > 1:
+                self._update_ppo(b_int, b_float, b_act, b_lp, b_rew, b_mask)
+            
+            rewards_hist.append(ep_r)
+            if len(rewards_hist)>50: rewards_hist.pop(0)
+            pbar.set_postfix({'AvgR': f"{np.mean(rewards_hist):.2f}"})
+
+    def _update_ppo(self, b_int, b_float, b_act, b_lp, b_rew, b_mask):
+        returns = []
+        R = 0
+        for r, m in zip(reversed(b_rew), reversed(b_mask)):
+            R = r + config.PPO_GAMMA * R * m
+            returns.insert(0, R)
+        returns = torch.tensor(returns)
+        
+        if returns.numel() > 1 and returns.std() > 1e-5:
+            returns = (returns - returns.mean()) / (returns.std() + 1e-5)
+        elif returns.numel() > 1:
+             returns = returns - returns.mean()
+        
+        s_int = torch.cat(b_int)
+        s_float = torch.cat(b_float)
+        act = torch.stack(b_act)
+        old_lp = torch.stack(b_lp).detach()
+        
+        for _ in range(config.PPO_K_EPOCHS):
+            logits, vals = self.model(s_int, s_float)
+            dist = Categorical(logits=logits)
+            new_lp = dist.log_prob(act)
+            ratio = torch.exp(new_lp - old_lp)
+            
+            surr1 = ratio * returns
+            surr2 = torch.clamp(ratio, 1-config.PPO_EPS_CLIP, 1+config.PPO_EPS_CLIP) * returns
+            
+            v_pred = vals.squeeze()
+            if v_pred.shape != returns.shape:
+                v_pred = v_pred.view(-1)
+                returns = returns.view(-1)
+                
+            loss = -torch.min(surr1, surr2).mean() + 0.5 * F.mse_loss(v_pred, returns)
+            self.optimizer.zero_grad()
+            loss.backward()
+            self.optimizer.step()
+
+    def evaluate(self, test_scores):
+        print("\n>>> [Step 4] 评估测试集...")
+        self.model.eval()
+        results = []
+        
+        cnt_detected = 0
+        cnt_kb_covered = 0
+        cnt_path_match = 0
+        cnt_root_match = 0
+        cnt_new = 0
+        
+        env = CausalTracingEnv(self.causal_graph, test_scores, self.threshold_df, self.expert_knowledge)
+        
+        for win_idx in range(len(test_scores)):
+            scores = test_scores[win_idx]
+            active = []
+            for t_name in config.TRIGGER_SENSORS:
+                if t_name in self.sensor_map:
+                    idx = self.sensor_map[t_name]
+                    if scores[idx] > config.TRIGGER_SCORE_THRESH:
+                        active.append((t_name, idx))
+            
+            for t_name, t_idx in active:
+                cnt_detected += 1
+                state_data = env.reset(force_window_idx=win_idx, force_trigger=t_name)
+                path_idxs = [t_idx]
+                done = False
+                
+                while not done:
+                    s_int = state_data[0].unsqueeze(0)
+                    s_float = state_data[1].unsqueeze(0)
+                    valid = env.get_valid_actions(path_idxs[-1])
+                    if len(valid) == 0: break
+                    
+                    logits, _ = self.model(s_int, s_float)
+                    mask = torch.full_like(logits, -1e9)
+                    mask[0, valid] = 0
+                    act = torch.argmax(logits + mask, dim=1).item()
+                    state_data, _, done, _ = env.step(act)
+                    path_idxs.append(act)
+                    if len(path_idxs) >= config.MAX_PATH_LENGTH: done = True
+                
+                path_names = [self.idx_to_sensor[i] for i in path_idxs]
+                root = path_names[-1]
+                root_score = scores[self.sensor_map[root]]
+                
+                match_status = "未定义"
+                logic = ""
+                
+                if t_idx in self.expert_knowledge:
+                    cnt_kb_covered += 1
+                    entry = self.expert_knowledge[t_idx]
+                    logic = entry.get('logic', '')
+                    
+                    real_roots = [self.idx_to_sensor[r] for r in entry['roots']]
+                    rm = False
+                    for p_node in path_names:
+                        if p_node in real_roots:
+                            rm = True
+                            break
+                    
+                    pm = False
+                    path_set = set(path_idxs)
+                    for exp_p in entry['paths']:
+                        exp_set = set(exp_p)
+                        intersection = len(path_set.intersection(exp_set))
+                        union = len(path_set.union(exp_set))
+                        if union > 0 and (intersection / union) >= 0.6:
+                            pm = True
+                            break
+                    
+                    if pm:
+                        match_status = "路径吻合"
+                        cnt_path_match += 1
+                        cnt_root_match += 1
+                    elif rm:
+                        match_status = "仅根因吻合"
+                        cnt_root_match += 1
+                    else:
+                        match_status = "不吻合"
+                else:
+                    match_status = "新链路"
+                    cnt_new += 1
+                
+                results.append({
+                    "窗口ID": win_idx,
+                    "诱发变量": t_name,
+                    "溯源路径": "->".join(path_names),
+                    "根因变量": root,
+                    "根因异常分": f"{root_score:.3f}",
+                    "是否知识库": "是" if t_idx in self.expert_knowledge else "否",
+                    "匹配情况": match_status,
+                    "机理描述": logic
+                })
+
+        denom = max(cnt_kb_covered, 1)
+        summary = [
+            {"指标": "检测到的总异常样本数", "数值": cnt_detected},
+            {"指标": "知识库覆盖的样本数", "数值": cnt_kb_covered},
+            {"指标": "异常链路准确率", "数值": f"{cnt_path_match/denom:.2%}"},
+            {"指标": "根因准确率", "数值": f"{cnt_root_match/denom:.2%}"},
+            {"指标": "新发现异常模式数", "数值": cnt_new}
+        ]
+        
+        save_path = os.path.join(config.RESULT_SAVE_DIR, config.TEST_RESULT_FILENAME)
+        with pd.ExcelWriter(save_path, engine='openpyxl') as writer:
+            pd.DataFrame(summary).to_excel(writer, sheet_name='Sheet1_概览指标', index=False)
+            pd.DataFrame(results).to_excel(writer, sheet_name='Sheet2_测试集详情', index=False)
+            
+        print("\n" + "="*50)
+        print(pd.DataFrame(summary).to_string(index=False))
+        print(f"\n文件已保存: {save_path}")
+        print("="*50)
+
+    def save_model(self):
+        path = os.path.join(config.MODEL_SAVE_DIR, "ppo_tracing_model.pth")
+        torch.save(self.model.state_dict(), path)

+ 291 - 0
models/Dynamic_anomaly_diagnosis/test.py

@@ -0,0 +1,291 @@
+# -*- coding: utf-8 -*-
+"""test.py: 在线诊断接口"""
+import os
+import pandas as pd
+import numpy as np
+import torch
+from config import config
+from data_processing import DataAnomalyProcessor
+from causal_structure import CausalStructureBuilder
+from rl_tracing import RLTrainer, CausalTracingEnv
+
+class WaterPlantDiagnoser:
+    def __init__(self):
+        
+        # 1. 初始化数据处理器 (用于加载阈值和计算得分,异常表征逻辑与训练完全一致)
+        self.processor = DataAnomalyProcessor()
+        self.sensor_list = self.processor.sensor_list
+        self.threshold_df = self.processor.threshold_df
+        
+        # 2. 构建因果图 (Layer 2)
+        self.builder = CausalStructureBuilder(self.threshold_df)
+        self.causal_graph = self.builder.build()
+        
+        # 3. 加载强化学习模型 (Layer 3)
+        dummy_scores = np.zeros((1, len(self.sensor_list)), dtype=np.float32)
+        self.trainer = RLTrainer(self.causal_graph, dummy_scores, self.threshold_df)
+        
+        model_path = os.path.join(config.MODEL_SAVE_DIR, "ppo_tracing_model.pth")
+        if not os.path.exists(model_path):
+            print(f"[Warning] 未找到模型文件: {model_path}。如果是测试环境,请确保已有预训练模型。")
+        else:
+            state_dict = torch.load(model_path, map_location=torch.device('cpu'), weights_only=True)
+            self.trainer.model.load_state_dict(state_dict)
+        
+        self.trainer.model.eval()
+        
+    def _preprocess_dataframe(self, df_input):
+        """
+        严格复现数据处理逻辑
+        """
+        try:
+            df = df_input.copy()
+            # 1. 时间解析
+            time_col = df.columns[0]
+            df[time_col] = pd.to_datetime(df[time_col], format='mixed', errors='coerce')
+            df = df.dropna(subset=[time_col]).set_index(time_col)
+            
+            # 2. 筛选列
+            valid_cols = [c for c in df.columns if c in self.sensor_list]
+            if not valid_cols: return None
+            df_valid = df[valid_cols]
+            
+            # 3. 降采样 (4s -> 20s)
+            df_resampled = df_valid.resample(f"{config.TARGET_SAMPLE_INTERVAL}s").mean()
+            df_resampled = df_resampled.astype(np.float32)
+            
+            return df_resampled
+        except Exception as e:
+            print(f"数据预处理错误: {e}")
+            return None
+
+    def api_predict(self, df_raw):
+        """
+        对外接口: 每次输入过去2小时的数据,取最新的40分钟进行诊断
+        """
+        # --- Layer 1: 数据预处理 ---
+        df_resampled = self._preprocess_dataframe(df_raw)
+        if df_resampled is None or df_resampled.empty:
+            return {"status": "error", "msg": "数据预处理失败或数据为空"}
+            
+        # --- Layer 1: 异常得分计算 (包含绝对+MAD综合得分) ---
+        scores_dict = {}
+        for sensor in self.sensor_list:
+            if sensor in df_resampled.columns:
+                scores_dict[sensor] = self.processor._calculate_point_score_vectorized(
+                    df_resampled[sensor], sensor
+                )
+            else:
+                # 严格对齐: 使用 np.nan 保持与训练完全一致
+                scores_dict[sensor] = pd.Series(np.nan, index=df_resampled.index, dtype=np.float32)
+        
+        point_score_df = pd.DataFrame(scores_dict)[self.sensor_list]
+        
+        # --- Layer 1: 提取最新40分钟进行窗口聚合与原始数据均值计算 ---
+        req_points = config.POINTS_PER_WINDOW  # 40分钟 = 120个点 (20s间隔)
+        total_points = len(point_score_df)
+        
+        if total_points < req_points:
+            return {
+                "status": "warning", 
+                "message": f"传入数据不足40分钟 (当前: {total_points*20}s, 需要: {req_points*20}s)"
+            }
+            
+        # 切片:取最后40分钟的数据 (用于计算异常分) 和 原始数据 (用于计算偏差百分比)
+        latest_40min_scores = point_score_df.iloc[-req_points:].values
+        latest_40min_raw = df_resampled.iloc[-req_points:]
+        # 计算这40分钟内原始传感器的均值,作为业务展示的参考值
+        real_time_values = latest_40min_raw.mean(skipna=True)
+        
+        # 计算有效比例和 95 分位数
+        valid_counts = np.sum(~np.isnan(latest_40min_scores), axis=0)
+        valid_ratios = valid_counts / req_points
+        
+        current_window_scores = np.zeros(latest_40min_scores.shape[1], dtype=np.float32)
+        valid_mask = valid_ratios >= config.VALID_DATA_RATIO
+        
+        if np.any(valid_mask):
+            current_window_scores[valid_mask] = np.nanquantile(latest_40min_scores[:, valid_mask], 0.95, axis=0)
+        
+        current_window_scores = np.clip(current_window_scores, 0, 1)
+        
+        # --- Layer 2 & 3: 触发检测与溯源 ---
+        results = []
+        active_triggers = []
+        
+        for t_name in config.TRIGGER_SENSORS:
+            if t_name in self.causal_graph['sensor_to_idx']:
+                idx = self.causal_graph['sensor_to_idx'][t_name]
+                score = current_window_scores[idx]
+                if score > config.TRIGGER_SCORE_THRESH:
+                    active_triggers.append((t_name, idx, score))
+
+        if not active_triggers:
+            return {
+                "status": "normal", 
+                "message": "系统运行正常", 
+            }
+
+        # 构建临时环境进行溯源
+        env_scores = current_window_scores.reshape(1, -1)
+        temp_env = CausalTracingEnv(self.causal_graph, env_scores, self.threshold_df, self.trainer.expert_knowledge)
+        
+        for t_name, t_idx, t_score in active_triggers:
+            state_data = temp_env.reset(force_window_idx=0, force_trigger=t_name)
+            path_idxs = [t_idx]
+            done = False
+            
+            while not done:
+                s_int = state_data[0].unsqueeze(0)
+                s_float = state_data[1].unsqueeze(0)
+                valid = temp_env.get_valid_actions(path_idxs[-1])
+                
+                if len(valid) == 0: break
+                
+                logits, _ = self.trainer.model(s_int, s_float)
+                mask = torch.full_like(logits, -1e9)
+                mask[0, valid] = 0
+                act = torch.argmax(logits + mask, dim=1).item()
+                
+                state_data, _, done, _ = temp_env.step(act)
+                path_idxs.append(act)
+                if len(path_idxs) >= config.MAX_PATH_LENGTH: done = True
+            
+            path_names = [self.trainer.idx_to_sensor[i] for i in path_idxs]
+            
+            # --- 1 & 2:计算链路有效性与双重偏差量化 ---
+            path_details = []
+            abnormal_other_nodes_count = 0
+            
+            for node_idx, node_name in zip(path_idxs, path_names):
+                node_score = current_window_scores[node_idx]
+                is_node_abnormal = node_score > config.WINDOW_ANOMALY_THRESHOLD
+                
+                # 若不是诱发变量本身且达到异常标准,计数+1
+                if node_idx != t_idx and is_node_abnormal:
+                    abnormal_other_nodes_count += 1
+                
+                # 1. 获取当前参考值
+                raw_val = real_time_values.get(node_name, np.nan)
+                
+                # 2. 实时回溯计算该节点的动态基线与上下限
+                dyn_lower, dyn_upper, dyn_med = np.nan, np.nan, np.nan
+                if node_name in df_resampled.columns:
+                    vals = df_resampled[node_name].astype(np.float32)
+                    # 模拟滑动窗口还原 MAD 的历史状态
+                    r_med = vals.rolling(window=config.MAD_HISTORY_WINDOW, min_periods=1).median()
+                    r_mad = (vals - r_med).abs().rolling(window=config.MAD_HISTORY_WINDOW, min_periods=1).median()
+                    
+                    # 提取时刻末尾的值作为基线
+                    last_med = r_med.iloc[-1]
+                    last_mad = r_mad.iloc[-1]
+                    dyn_lower = last_med - config.MAD_THRESHOLD * last_mad
+                    dyn_upper = last_med + config.MAD_THRESHOLD * last_mad
+                    dyn_med = last_med
+
+                # 3. 分别构建【物理范围】与【动态范围】的显示文字
+                phy_str = "未定义"
+                dyn_str = "未定义"
+                node_name_zh = "未知"
+                
+                # 提取中文名称
+                if node_name in self.processor.threshold_dict:
+                    info = self.processor.threshold_dict[node_name]
+                    node_name_zh = info.get('Name', '未知')
+                
+                if not pd.isna(raw_val) and node_name in self.processor.threshold_dict:
+                    info = self.processor.threshold_dict[node_name]
+                    g_min, g_max = info['Good_min'], info['Good_max']
+                    
+                    # -- 物理范围判定 --
+                    if raw_val > g_max and g_max != np.inf:
+                        pct = (raw_val - g_max) / (abs(g_max) + 1e-5) * 100
+                        phy_str = f"偏高 {pct:.1f}% (物理允许上限: {g_max})"
+                    elif raw_val < g_min and g_min != -np.inf:
+                        pct = (g_min - raw_val) / (abs(g_min) + 1e-5) * 100
+                        phy_str = f"偏低 {pct:.1f}% (物理允许下限: {g_min})"
+                    else:
+                        phy_str = f"正常 (物理范围: [{g_min}, {g_max}])"
+                        
+                    # -- 动态范围判定 --
+                    if not pd.isna(dyn_lower) and not pd.isna(dyn_upper):
+                        if raw_val > dyn_upper:
+                            dyn_str = f"异常突增 (近期基线: {dyn_med:.2f}, 动态上限: {dyn_upper:.2f})"
+                        elif raw_val < dyn_lower:
+                            dyn_str = f"异常突降 (近期基线: {dyn_med:.2f}, 动态下限: {dyn_lower:.2f})"
+                        else:
+                            dyn_str = f"平稳波动 (近期基线: {dyn_med:.2f}, 动态区间: [{dyn_lower:.2f}, {dyn_upper:.2f}])"
+
+                # 组装为清晰的结构化文本
+                dev_str = f"当前值: {raw_val:.2f} | 物理工况: {phy_str} | 动态趋势: {dyn_str}"
+
+                path_details.append({
+                    "node": node_name,
+                    "name": node_name_zh, 
+                    "anomaly_score": round(float(node_score), 4),
+                    "is_abnormal": bool(is_node_abnormal),
+                    "deviation": dev_str
+                })
+            
+            #  除诱发变量外,只要有 >= 1 个节点异常即触发溯源报警
+            if abnormal_other_nodes_count >= 1:
+                results.append({
+                    "trigger": t_name,
+                    "path": " -> ".join(path_names),
+                    "root_cause": path_names[-1],
+                    "details": path_details
+                })
+            
+        # 若所有触发链路均不满足 >= 1 个额外异常节点的条件
+        if not results:
+            return {
+                "status": "normal",
+                "message": "系统运行正常",
+            }
+            
+        return {
+            "status": "abnormal",
+            "results": results
+        }
+
+
+if __name__ == "__main__":
+    
+    # 模拟外部调用机制:每次固定传 2 小时的数据
+    test_file = os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}1.csv")
+    
+    if os.path.exists(test_file):
+        print(">>> 正在启动在线诊断引擎测试 (模式:读2小时,查末尾40分钟)...")
+        diagnoser = WaterPlantDiagnoser()
+        
+        # 假设原始数据采样率为 4 秒一次
+        # 2小时 = 7200秒 = 1800 行原始数据
+        CHUNK_SIZE = 1800 
+        
+        df_full = pd.read_csv(test_file, low_memory=False)
+        
+        if len(df_full) >= CHUNK_SIZE * 3:
+            import json
+            
+            # 模拟 1:传入 0~2 小时的数据
+            df_sim1 = df_full.iloc[0 : CHUNK_SIZE]
+            print("\n[测试 1] 传入 0~2 小时数据:")
+            print(json.dumps(diagnoser.api_predict(df_sim1), ensure_ascii=False, indent=2))
+            
+            # 模拟 2:传入 2~4 小时的数据
+            df_sim2 = df_full.iloc[CHUNK_SIZE : CHUNK_SIZE * 2]
+            print("\n[测试 2] 传入 2~4 小时数据:")
+            print(json.dumps(diagnoser.api_predict(df_sim2), ensure_ascii=False, indent=2))
+            
+            # 模拟 3:传入末尾 2 小时的数据
+            df_sim3 = df_full.iloc[-CHUNK_SIZE:]
+            print("\n[测试 3] 传入 4~6 小时数据:")
+            print(json.dumps(diagnoser.api_predict(df_sim3), ensure_ascii=False, indent=2))
+            
+        else:
+            import json
+            print("\n数据量不足以做三次两小时模拟,直接进行单次全量模拟:")
+            print(json.dumps(diagnoser.api_predict(df_full), ensure_ascii=False, indent=2))
+            
+    else:
+        print(f"测试文件 {test_file} 不存在,无法执行本地测试。")