Ver Fonte

1. 添加了龙亭动态异常诊断代码 2.代码整理

zhanghao há 3 semanas atrás
pai
commit
4d4c1334de

+ 38 - 0
models/Dynamic_anomaly_diagnosis/README.md

@@ -0,0 +1,38 @@
+# 双膜工艺(UF-RO)动态异常诊断系统
+
+基于“双重异常量化”与“强化学习(PPO)因果溯源”的水处理过程智能诊断引擎。
+本项目旨在为水厂的超滤-反渗透(UF-RO)双膜系统提供从数据清洗、异常预警到根因定位的端到端解决方案。
+
+## 🌟 架构设计特点:高内聚,低耦合
+本项目采用了**“核心算法引擎 + 水厂独立工作空间(Workspace)”**的插件化解耦架构:
+- **通用引擎层**:所有水厂共享一套核心数据处理、图构建与强化学习寻路算法。
+- **水厂工作空间**:每个水厂(如 `xishan`, `longting`)拥有独立的文件目录,
+                   实现了配置、数据集、专家知识库与模型权重的完全物理隔离,
+                   极大地提升了系统的可迁移性与可维护性。
+
+---
+
+## 📂 目录结构说明
+
+```text
+项目根目录/
+├── 核心引擎层 (通用代码)
+│   ├── data_processing.py      # Layer 1: 时序数据预处理与双重异常量化(绝对阈值+动态MAD)
+│   ├── causal_structure.py     # Layer 2: 基于工艺层级与设备约束的物理因果图构建
+│   ├── rl_tracing.py           # Layer 3: 基于 Actor-Critic 架构的 PPO 强化学习根因溯源
+│   ├── config.py               # 动态配置加载器(基于相对路径与 YAML 解析)
+│   ├── main.py                 # 模型训练与评估主入口
+│   └── test.py                 # 在线诊断与实时测试接口
+│
+├── xishan/                     # 🏆 锡山水厂专属工作空间
+│   ├── config.yaml               # 锡山专属配置文件(路径、算法超参数、传感器列表)
+│   ├── sensor_threshold.xlsx     # 锡山传感器物理阈值与层级定义表
+│   ├── abnormal_link.xlsx        # 锡山专家历史异常链路知识库(用于BC预训练)
+│   ├── ppo_tracing_model.pth     # 训练生成的锡山专属 PPO 模型权重
+│
+│
+└── longting/                   # 🏆 龙亭水厂专属工作空间
+    ├── config.yaml               
+    ├── sensor_threshold.xlsx     
+    ├── abnormal_link.xlsx        
+    ├── ppo_tracing_model.pth     

+ 87 - 0
models/Dynamic_anomaly_diagnosis/causal_structure.py

@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+"""
+causal_structure.py: 第二层 - 物理因果结构构建
+该模块负责将专家知识库(如工艺层级划分、设备归属)转化为图结构(邻接矩阵)。
+"""
+import numpy as np
+import pandas as pd
+from config import config
+
+class CausalStructureBuilder:
+    def __init__(self, threshold_df):
+        self.df = threshold_df
+        self.sensor_list = self.df['ID'].tolist()
+        self.id_to_idx = {name: i for i, name in enumerate(self.sensor_list)}
+        self.num_sensors = len(self.sensor_list)
+        self.col_layer = self._find_col_by_keyword(config.KEYWORD_LAYER)
+        self.col_device = self._find_col_by_keyword(config.KEYWORD_DEVICE)
+
+    def _find_col_by_keyword(self, keyword):
+        if keyword in self.df.columns: return keyword
+        for col in self.df.columns:
+            if col.lower() == keyword.lower(): return col
+        raise ValueError(f"错误: 未找到列名包含 '{keyword}' 的列")
+
+    def build(self):
+        """
+        核心构建逻辑:基于规则生成传感器之间的有向连接关系(邻接矩阵)
+        返回值包含:传感器列表、索引映射字典、邻接矩阵(adj_matrix)
+        """
+        # 初始化 N x N 的全零矩阵,0 表示无连接,1 表示有连接
+        adj_matrix = np.zeros((self.num_sensors, self.num_sensors), dtype=int)
+        nodes_info = {}
+        
+        # 1. 遍历解析所有节点的属性字典
+        for _, row in self.df.iterrows():
+            d_val = row[self.col_device]
+            
+            # 清洗设备名:处理空值、NaN 等异常输入
+            dev_id = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None
+           
+            # 清洗层级号:如果层级未定义或填写错误,赋予 -1 表示该节点不参与因果溯源
+            try: l_val = int(row[self.col_layer])
+            except: l_val = -1
+            
+            nodes_info[row['ID']] = {'layer': l_val, 'device': dev_id}
+            
+        count_edges = 0
+        
+        # 2. 嵌套循环对比每一对传感器,判断它们之间是否存在“因果通路”
+        for i, src_name in enumerate(self.sensor_list):
+            src_node = nodes_info.get(src_name)
+            
+            # 如果起始节点没有定义有效层级,则跳过
+            if not src_node or src_node['layer'] == -1: continue
+            src_l, src_d = src_node['layer'], src_node['device']
+            
+            for j, dst_name in enumerate(self.sensor_list):
+                # 排除自身到自身的连接(防止图遍历时陷入死循环)
+                if i == j: continue 
+            
+                dst_node = nodes_info.get(dst_name)
+                
+                # 如果目标节点没有定义有效层级,则跳过
+                if not dst_node or dst_node['layer'] == -1: continue
+                dst_l, dst_d = dst_node['layer'], dst_node['device']
+                
+                # ==================== (A) 层级约束 (Layer Constraint) ====================
+                # 水厂工艺是从上游传导到下游的。溯源方向是由下到上。
+                # dst_l == src_l: 允许在同层级(例如同一环节的不同传感器)平移寻找
+                # dst_l == src_l - 1: 允许向上一级(上游环节)寻找原因。绝不允许越级或向下游找。
+                is_layer_valid = (dst_l == src_l) or (dst_l == src_l - 1)
+                if not is_layer_valid: continue
+                    
+                # ==================== (B) 设备约束 (Device Constraint) ====================
+                # 如果两个传感器明确归属于不同的具体设备(如一个是 RO1 膜,一个是 RO2 膜),
+                # 则判定它们之间物理隔离,不存在因果关系,切断连接。
+                is_dev_valid = True
+                if (src_d is not None) and (dst_d is not None):
+                    if src_d != dst_d: is_dev_valid = False
+                
+                # 如果同时满足 层级约束 和 设备约束,则判定为有效通路
+                if is_dev_valid:
+                    adj_matrix[i, j] = 1
+                    count_edges += 1
+        return {"sensor_list": self.sensor_list, "sensor_to_idx": self.id_to_idx, "adj_matrix": adj_matrix}
+    
+    

+ 88 - 0
models/Dynamic_anomaly_diagnosis/config.py

@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+"""config.py: 纯相对路径动态配置加载器"""
+import os
+import yaml
+
+class Config:
+    def __init__(self):
+        self._config_data = {}
+        self.PLANT_NAME = ""
+        self.PLANT_DIR = "" 
+        
+    def load(self, plant_name: str):
+        """传入水厂名称 (如 'longting'),自动挂载该水厂所有相对路径"""
+        self.PLANT_NAME = plant_name
+        self.PLANT_DIR = f"./{plant_name}"
+        
+        yaml_path = f"{self.PLANT_DIR}/config.yaml"
+        if not os.path.exists(yaml_path):
+            raise FileNotFoundError(f"找不到配置文件: {yaml_path}")
+            
+        with open(yaml_path, 'r', encoding='utf-8') as f:
+            self._config_data = yaml.safe_load(f)
+            
+        self._parse_config()
+        self._init_directories()
+
+    def _parse_config(self):
+        files = self._config_data.get('files', {})
+        
+        # 1. 目录路径
+        self.DATASET_SENSOR_DIR = f"{self.PLANT_DIR}/datasets"
+        self.RESULT_SAVE_DIR = f"{self.PLANT_DIR}/results"
+        self.MODEL_SAVE_DIR = self.PLANT_DIR  # 模型保存在水厂根目录
+        
+        # 2. 完整文件相对路径 
+        self.THRESHOLD_FILENAME = f"{self.PLANT_DIR}/{files.get('threshold_filename', 'sensor_threshold.xlsx')}"
+        self.ABNORMAL_LINK_FILENAME = f"{self.PLANT_DIR}/{files.get('abnormal_link_filename', 'abnormal_link.xlsx')}"
+        self.MODEL_FILE_PATH = f"{self.PLANT_DIR}/{files.get('model_filename', 'ppo_tracing_model.pth')}"
+        self.TEST_RESULT_FILENAME = files.get('test_result_filename', 'Final_Test_Report.xlsx') # 这个留给 pd.ExcelWriter 处理
+        self.SENSOR_FILE_PREFIX = files.get('sensor_file_prefix', 'data_process_')
+
+        # 3. 传感器与关键字
+        sensors = self._config_data.get('sensors', {})
+        self.KEYWORD_LAYER = sensors.get('keyword_layer', 'One_layer')
+        self.KEYWORD_DEVICE = sensors.get('keyword_device', 'Device')
+        self.TRIGGER_SENSORS = sensors.get('trigger_sensors', [])
+
+        # 4. 数据处理参数
+        data = self._config_data.get('data_processing', {})
+        self.SENSOR_FILE_NUM_RANGE = tuple(data.get('sensor_file_num_range', (1, 10)))
+        self.ORIGINAL_SAMPLE_INTERVAL = data.get('original_sample_interval', 4)
+        self.TARGET_SAMPLE_INTERVAL = data.get('target_sample_interval', 20)
+        self.WINDOW_DURATION_MIN = data.get('window_duration_min', 40)
+        
+        # 衍生变量
+        self.POINTS_PER_WINDOW = int((self.WINDOW_DURATION_MIN * 60) / self.TARGET_SAMPLE_INTERVAL)
+        self.WINDOW_STEP = self.POINTS_PER_WINDOW // 2
+        
+        self.VALID_DATA_RATIO = data.get('valid_data_ratio', 0.6)
+        self.WINDOW_ANOMALY_THRESHOLD = data.get('window_anomaly_threshold', 0.2)
+        self.TRAIN_TEST_SPLIT = data.get('train_test_split', 0.8)
+        self.TRIGGER_SCORE_THRESH = data.get('trigger_score_thresh', 0.5)
+        self.ABSOLUTE_SCORE_WEIGHT = data.get('absolute_score_weight', 0.6)
+        self.DYNAMIC_SCORE_WEIGHT = data.get('dynamic_score_weight', 0.4)
+        self.MAD_HISTORY_WINDOW = data.get('mad_history_window', 360)
+        self.MAD_THRESHOLD = data.get('mad_threshold', 3.0)
+
+        # 5. 强化学习参数
+        rl = self._config_data.get('rl_params', {})
+        self.MIN_PATH_LENGTH = rl.get('min_path_length', 3)
+        self.MAX_PATH_LENGTH = rl.get('max_path_length', 6)
+        self.EMBEDDING_DIM = rl.get('embedding_dim', 64)
+        self.HIDDEN_DIM = rl.get('hidden_dim', 256)
+        self.PPO_LR = float(rl.get('ppo_lr', 3e-4))
+        self.PPO_GAMMA = rl.get('ppo_gamma', 0.90)
+        self.PPO_EPS_CLIP = rl.get('ppo_eps_clip', 0.2)
+        self.PPO_K_EPOCHS = rl.get('ppo_k_epochs', 10)
+        self.PPO_BATCH_SIZE = rl.get('ppo_batch_size', 64)
+        self.BC_EPOCHS = rl.get('bc_epochs', 20000)
+        self.RL_EPISODES = rl.get('rl_episodes', 20)
+
+    def _init_directories(self):
+        """确保当前水厂的数据和结果目录存在"""
+        os.makedirs(self.DATASET_SENSOR_DIR, exist_ok=True)
+        os.makedirs(self.RESULT_SAVE_DIR, exist_ok=True)
+
+# 实例化全局单例
+config = Config()

+ 282 - 0
models/Dynamic_anomaly_diagnosis/data_processing.py

@@ -0,0 +1,282 @@
+# -*- coding: utf-8 -*-
+"""
+data_processing.py: 第一层 - 异常量化表征 
+该模块负责将海量、原始、可能有缺失值的传感器时序数据,
+转化为系统可理解的、标准化的 0~1 异常得分矩阵。
+包含数据降采样、双重异常评估、滑动窗口聚合等核心逻辑。
+"""
+import pandas as pd
+import numpy as np
+import os
+from tqdm import tqdm
+from config import config
+import gc
+from joblib import Parallel, delayed
+import multiprocessing
+
+def _process_single_file_task(file_idx, file_path, sensor_list, target_interval):
+    """
+    单个文件的读取与降采样任务(运行在子进程中)
+    """
+    if not os.path.exists(file_path):
+        return None
+    
+    try:
+        # 1. 快速读取
+        df_temp = pd.read_csv(file_path, index_col=None, low_memory=False)
+        if df_temp.empty:
+            return None
+            
+        # 2. 时间解析 (mixed格式)
+        time_col = df_temp.columns[0]
+        df_temp[time_col] = pd.to_datetime(
+            df_temp[time_col], 
+            format='mixed', 
+            errors='coerce'
+        )
+        df_temp = df_temp.dropna(subset=[time_col]).set_index(time_col)
+        
+        # 3. 筛选列
+        valid_cols = [c for c in df_temp.columns if c in sensor_list]
+        if not valid_cols:
+            return None
+            
+        df_valid = df_temp[valid_cols]
+        
+        # 4. 立即降采样 (4s -> 20s) & 类型转换
+        df_resampled = df_valid.resample(f"{target_interval}s").mean()
+        df_resampled = df_resampled.astype(np.float32)
+        
+        return df_resampled
+        
+    except Exception as e:
+        print(f"文件 {file_idx} 处理出错: {e}")
+        return None
+
+def _calculate_window_chunk(start_indices, values, win_len, valid_ratio, threshold_val=0.95):
+    """
+    窗口计算任务块(运行在子进程中)
+    将连续的时序点打包成窗口(例如 40分钟=120个点),计算该窗口内的综合异常程度。
+    """
+    chunk_results = []
+    
+    for start in start_indices:
+        win_data = values[start : start + win_len, :]
+        
+        # 向量化计算窗口内的数据有效性(非 NaN 数据的比例)
+        valid_counts = np.sum(~np.isnan(win_data), axis=0)
+        valid_ratios = valid_counts / win_len
+        
+        win_res = np.zeros(win_data.shape[1], dtype=np.float32)
+        valid_mask = valid_ratios >= valid_ratio
+        
+        if np.any(valid_mask):
+            # 取窗口内的 95 分位数作为该窗口的代表异常分,过滤掉偶发的极端孤立噪点
+            quantile_scores = np.nanquantile(win_data[:, valid_mask], threshold_val, axis=0)
+            win_res[valid_mask] = quantile_scores
+            
+        # 限制分数在 0~1 之间    
+        chunk_results.append(np.clip(win_res, 0, 1))
+        
+    return chunk_results
+
+class DataAnomalyProcessor:
+    def __init__(self):
+        self.threshold_path = config.THRESHOLD_FILENAME
+        self.threshold_df = self._load_thresholds()
+        self.sensor_list = self.threshold_df['ID'].tolist()
+        self.threshold_dict = self.threshold_df.set_index('ID').to_dict('index')
+        
+        # 根据CPU核心数设定并行度
+        self.n_jobs_io = 24  
+        self.n_jobs_cpu = 64 
+        
+    def _load_thresholds(self):
+        """加载阈值表"""
+        try:
+            df = pd.read_excel(self.threshold_path)
+        except Exception as e:
+            raise ValueError(f"读取阈值表失败: {e}")
+
+        required_cols = ['ID', 'Direction', 'Hard_min', 'Hard_max', 'Good_min', 'Good_max']
+        for col in required_cols:
+            if col not in df.columns:
+                for c in df.columns:
+                    if c.lower() == col.lower():
+                        df.rename(columns={c: col}, inplace=True)
+                        break
+        
+        cols_to_numeric = ['Hard_min', 'Hard_max', 'Good_min', 'Good_max', 'Alarm_time']
+        for c in cols_to_numeric:
+            if c in df.columns:
+                df[c] = pd.to_numeric(df[c], errors='coerce')
+        
+        df['Hard_min'] = df['Hard_min'].fillna(-np.inf)
+        df['Hard_max'] = df['Hard_max'].fillna(np.inf)
+        df['Good_min'] = df['Good_min'].fillna(-np.inf)
+        df['Good_max'] = df['Good_max'].fillna(np.inf)
+        df['Alarm_time'] = df['Alarm_time'].fillna(60)
+        return df
+
+    def _calculate_point_score_vectorized(self, series, sensor_name):
+        """双重异常得分计算(绝对阈值 + 动态MAD)"""
+        # 向量化计算逻辑
+        if sensor_name not in self.threshold_dict:
+            return pd.Series(0.0, index=series.index, dtype=np.float32)
+        
+        info = self.threshold_dict[sensor_name]
+        vals = series.astype(np.float32).copy()
+        
+        # 1. 物理硬阈值过滤:超出物理极限的数据视为无效/传感器故障,直接置为 NaN
+        mask_invalid = (vals < info['Hard_min']) | (vals > info['Hard_max'])
+        vals[mask_invalid] = np.nan
+        
+        # 如果全部为 NaN,直接返回 0
+        if vals.isna().all(): 
+            return pd.Series(0.0, index=vals.index, dtype=np.float32)
+        
+        # ==================== (A) 计算绝对阈值得分 ====================
+        abs_scores = pd.Series(0.0, index=vals.index, dtype=np.float32)
+        direction = str(info['Direction']).strip().lower()
+        
+        # 计算偏低异常:当值低于 Good_min 但高于 Hard_min 时,采用线性插值计算异常度 (0~1)
+        if direction in ['low', 'both']:
+            mask_low = (vals < info['Good_min']) & (vals >= info['Hard_min'])
+            denom = info['Good_min'] - info['Hard_min']
+            if denom > 1e-5:
+                abs_scores[mask_low] = (info['Good_min'] - vals[mask_low]) / denom
+            else:
+                abs_scores[mask_low] = 1.0
+        
+        # 计算偏高异常:当值高于 Good_max 但低于 Hard_max 时
+        if direction in ['high', 'both']:
+            mask_high = (vals > info['Good_max']) & (vals <= info['Hard_max'])
+            denom = info['Hard_max'] - info['Good_max']
+            if denom > 1e-5:
+                abs_scores[mask_high] = (vals[mask_high] - info['Good_max']) / denom
+            else:
+                abs_scores[mask_high] = 1.0
+        
+        # 根据报警时间赋予时间权重,报警要求越快,异常得分放大比例越高
+        alarm_t = max(info['Alarm_time'], 1.0)
+        time_weight = 1.0 + (30.0 / alarm_t) 
+        abs_scores = (abs_scores * time_weight).clip(0, 1)
+        
+        # ==================== (B) 计算动态 MAD 得分 ====================
+        # MAD (中位数绝对偏差) 能在不依赖人为阈值的情况下,敏锐捕捉数据的“异常突降/突增”
+        # # 获取近期历史窗口的中位数作为“动态基线”
+        rolling_median = vals.rolling(
+            window=config.MAD_HISTORY_WINDOW, 
+            min_periods=1
+        ).median()
+        
+        # 计算绝对偏差 |x_i - Median|
+        abs_deviation = (vals - rolling_median).abs()
+        
+        # 计算 MAD = Median(|x_i - Median|)
+        rolling_mad = abs_deviation.rolling(
+            window=config.MAD_HISTORY_WINDOW, 
+            min_periods=1
+        ).median()
+        
+        # 计算动态得分:将偏差映射到 0~1 的区间,除以 (MAD_THRESHOLD * MAD)
+        # 加 1e-5 防止除以 0 导致溢出
+        dyn_scores = (abs_deviation / (rolling_mad * config.MAD_THRESHOLD + 1e-5)).clip(0, 1)
+        
+        # ==================== (C) 综合加权得分 ====================
+        final_scores = (config.ABSOLUTE_SCORE_WEIGHT * abs_scores) + (config.DYNAMIC_SCORE_WEIGHT * dyn_scores)
+        
+        # 恢复无效数据的 NaN 状态
+        final_scores[mask_invalid] = np.nan 
+        
+        return final_scores
+
+    def process(self):
+        """
+        主执行流水线:文件读取 -> 降采样 -> 异常打分 -> 窗口聚合 -> 数据集切分
+        """
+        print(f">>> [Step 1] 数据处理启动 | 检测到 CPU 核心数: {multiprocessing.cpu_count()}")
+        
+        # 1. 并行读取与降采样
+        file_indices = list(range(config.SENSOR_FILE_NUM_RANGE[0], config.SENSOR_FILE_NUM_RANGE[1] + 1))
+        file_paths = [os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}{i}.csv") for i in file_indices]
+        
+        print(f"正在并行读取 {len(file_indices)} 个文件 (并发数: {self.n_jobs_io})...")
+        
+        # 使用 joblib 并行执行 _process_single_file_task
+        results = Parallel(n_jobs=self.n_jobs_io, backend="loky", verbose=5)(
+            delayed(_process_single_file_task)(
+                idx, path, self.sensor_list, config.TARGET_SAMPLE_INTERVAL
+            ) for idx, path in zip(file_indices, file_paths)
+        )
+        
+        # 过滤掉 None 结果
+        all_series_list = [r for r in results if r is not None]
+        
+        if not all_series_list:
+            raise ValueError("没有读取到有效数据")
+
+        # 2. 合并数据
+        print("正在纵向合并数据...")
+        resampled_df = pd.concat(all_series_list, axis=0).sort_index()
+        resampled_df = resampled_df[~resampled_df.index.duplicated(keep='first')]
+        
+        # 释放内存
+        del all_series_list, results
+        gc.collect()
+
+        # 3. 计算点级得分
+        print("计算点级异常得分...")
+        scores_dict = {}
+        for sensor in tqdm(self.sensor_list, desc="计算得分"):
+            if sensor in resampled_df.columns:
+                scores_dict[sensor] = self._calculate_point_score_vectorized(resampled_df[sensor], sensor)
+            else:
+                scores_dict[sensor] = pd.Series(np.nan, index=resampled_df.index, dtype=np.float32)
+        
+        point_score_df = pd.DataFrame(scores_dict)
+        point_score_df = point_score_df[self.sensor_list] # 对齐列顺序
+        
+        # 4. 并行窗口聚合 (计算密集型)
+        print(f"正在并行计算窗口分位数 (并发数: {self.n_jobs_cpu})...")
+        
+        values = point_score_df.values
+        win_len = config.POINTS_PER_WINDOW
+        step = config.WINDOW_STEP
+        total_len = len(values)
+        
+        if total_len < win_len:
+             return np.array([]), np.array([]), self.threshold_df
+        
+        # 生成所有窗口的起始索引
+        all_start_indices = list(range(0, total_len - win_len, step))
+        
+        # 将索引切分为多个 Chunk,每个 Chunk 分配给一个 CPU 核心
+        num_chunks = self.n_jobs_cpu * 4
+        chunk_size = max(1, len(all_start_indices) // num_chunks)
+        chunks = [all_start_indices[i:i + chunk_size] for i in range(0, len(all_start_indices), chunk_size)]
+        
+        # 并行计算
+        chunk_results = Parallel(n_jobs=self.n_jobs_cpu, backend="loky", verbose=5)(
+            delayed(_calculate_window_chunk)(
+                chunk, values, win_len, config.VALID_DATA_RATIO
+            ) for chunk in chunks
+        )
+        
+        # 合并结果
+        window_scores = [item for sublist in chunk_results for item in sublist]
+        
+        final_scores = np.nan_to_num(np.array(window_scores, dtype=np.float32), nan=0.0)
+        
+        # 5. 切分数据集
+        total_samples = final_scores.shape[0]
+        if total_samples == 0:
+            raise ValueError("窗口数量为0")
+            
+        split_idx = int(total_samples * config.TRAIN_TEST_SPLIT)
+        train_scores = final_scores[:split_idx]
+        test_scores = final_scores[split_idx:]
+        
+        print(f"处理完成: 训练集 {len(train_scores)} | 测试集 {len(test_scores)}")
+        
+        return train_scores, test_scores, self.threshold_df

BIN
models/Dynamic_anomaly_diagnosis/longting/abnormal_link.xlsx


+ 55 - 0
models/Dynamic_anomaly_diagnosis/longting/config.yaml

@@ -0,0 +1,55 @@
+# longting/config.yaml
+project:
+  plant_name: "longting"
+
+files:
+  threshold_filename: "sensor_threshold.xlsx"
+  abnormal_link_filename: "abnormal_link.xlsx"
+  model_filename: "ppo_tracing_model.pth"
+  test_result_filename: "Final_Test_Report.xlsx"
+  sensor_file_prefix: "data_process_"
+
+data_processing:
+  sensor_file_num_range: [1, 11]
+  original_sample_interval: 4
+  target_sample_interval: 20
+  window_duration_min: 40
+  valid_data_ratio: 0.6
+  window_anomaly_threshold: 0.2
+  train_test_split: 0.8
+  trigger_score_thresh: 0.5
+  absolute_score_weight: 0.6
+  dynamic_score_weight: 0.4
+  mad_history_window: 360
+  mad_threshold: 3.0
+
+sensors:
+  keyword_layer: "One_layer"
+  keyword_device: "Device"
+  trigger_sensors:
+    - "UF1Per"
+    - "UF2Per"
+    - "ns=3;s=RO1_1D_YC"
+    - "ns=3;s=RO2_1D_YC"
+    - "ns=3;s=RO1_2D_YC"
+    - "ns=3;s=RO2_2D_YC"
+    - "ns=3;s=PUBLIC_BY_REAL_1"
+    - "ns=3;s=PUBLIC_BY_REAL_2"
+    - "ns=3;s=1#RO_SDCSFLOW_O"
+    - "ns=3;s=2#RO_SDCSFLOW_O"
+    - "ROHSL"
+    - "RO1_TYL"
+    - "RO2_TYL"
+
+rl_params:
+  min_path_length: 3
+  max_path_length: 6
+  embedding_dim: 64
+  hidden_dim: 256
+  ppo_lr: 0.0003
+  ppo_gamma: 0.90
+  ppo_eps_clip: 0.2
+  ppo_k_epochs: 10
+  ppo_batch_size: 64
+  bc_epochs: 20000
+  rl_episodes: 20000

BIN
models/Dynamic_anomaly_diagnosis/longting/ppo_tracing_model.pth


BIN
models/Dynamic_anomaly_diagnosis/longting/sensor_threshold.xlsx


+ 39 - 0
models/Dynamic_anomaly_diagnosis/main.py

@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+"""main.py: 主运行文件"""
+import argparse
+from config import config
+
+def main():
+    parser = argparse.ArgumentParser(description="水厂诊断模型训练")
+    parser.add_argument('-p', '--plant', type=str, required=True, help="水厂名称(对应文件夹名),例如: longting")
+    args = parser.parse_args()
+    
+    print(f"[*] 正在初始化工作空间: {args.plant}")
+    config.load(args.plant)
+
+    # 在 config 初始化完成后,再导入后面的通用逻辑
+    from data_processing import DataAnomalyProcessor
+    from causal_structure import CausalStructureBuilder
+    from rl_tracing import RLTrainer
+    
+    # 1. 数据层
+    processor = DataAnomalyProcessor()
+    train_scores, test_scores, threshold_df = processor.process()
+    
+    # 2. 因果层
+    builder = CausalStructureBuilder(threshold_df)
+    causal_graph = builder.build()
+    
+    # 3. 强化学习层
+    trainer = RLTrainer(causal_graph, train_scores, threshold_df)
+    trainer.pretrain_bc()   
+    trainer.train_ppo()     
+    trainer.save_model()
+    
+    # 4. 评估阶段
+    trainer.evaluate(test_scores)
+    
+    print(f"\n[Success] {args.plant} 水厂训练与评估完毕!模型保存在: {config.MODEL_FILE_PATH}")
+
+if __name__ == "__main__":
+    main()

+ 504 - 0
models/Dynamic_anomaly_diagnosis/rl_tracing.py

@@ -0,0 +1,504 @@
+# -*- coding: utf-8 -*-
+"""
+rl_tracing.py: 强化学习链路级异常溯源
+基于 PPO (Proximal Policy Optimization) 的 Actor-Critic 架构。
+结合了专家经验的“行为克隆 (Imitation Learning)”与“自主探索 (Reinforcement Learning)”,
+实现从“诱发变量”逆流而上寻找“根因变量”的智能寻路。
+"""
+import torch
+import torch.nn as nn
+import torch.optim as optim
+import torch.nn.functional as F
+from torch.distributions import Categorical
+import numpy as np
+import pandas as pd
+import os
+from tqdm import tqdm
+from config import config
+
+# ----------------- 1. 环境 -----------------
+class CausalTracingEnv:
+    """
+    强化学习交互环境。
+    定义了 AI 智能体的状态(State)、动作空间(Action Space)以及奖励机制(Reward Function)。
+    """
+    def __init__(self, causal_graph, window_scores, threshold_df, expert_knowledge=None):
+        self.sensor_list = causal_graph['sensor_list']
+        self.map = causal_graph['sensor_to_idx']
+        self.idx_to_sensor = {v: k for k, v in self.map.items()}
+        self.adj = causal_graph['adj_matrix']
+        self.scores = window_scores
+        
+        # 专家历史异常链路知识库
+        self.expert_knowledge = expert_knowledge if expert_knowledge else {}
+        self.num_sensors = len(self.sensor_list)
+        
+        # 解析每个传感器的层级 (Layer) 和归属设备 (Device) 属性,用于限制非法动作
+        self.node_props = {} 
+        col_one_layer = self._find_col(threshold_df, config.KEYWORD_LAYER)
+        col_device = self._find_col(threshold_df, config.KEYWORD_DEVICE)
+        
+        df_indexed = threshold_df.set_index('ID')
+        dict_one = df_indexed[col_one_layer].to_dict() if col_one_layer else {}
+        dict_dev = df_indexed[col_device].to_dict() if col_device else {}
+        
+        for name, idx in self.map.items():
+            l_val = dict_one.get(name, -1)
+            try: l_val = int(l_val)
+            except: l_val = 0 
+            d_val = dict_dev.get(name, None)
+            d_val = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None
+            self.node_props[idx] = {'one_layer': l_val, 'device': d_val}
+
+        # 初始化回合状态变量
+        self.current_window_idx = 0
+        self.current_node_idx = 0
+        self.prev_node_idx = 0
+        self.trigger_node_idx = 0
+        self.path = []
+        self.current_expert_paths = []
+        self.target_roots = set()
+        
+    def _find_col(self, df, keyword):
+        if keyword in df.columns: return keyword
+        for c in df.columns:
+            if c.lower() == keyword.lower(): return c
+        return None
+
+    def reset(self, force_window_idx=None, force_trigger=None):
+        """
+        重置环境,开启新的一轮寻路 (Episode)。
+        随机选取一个发生异常的时间窗口和触发报警的传感器作为起点。
+        """
+        if force_window_idx is not None:
+            self.current_window_idx = force_window_idx
+            t_name = force_trigger
+        else:
+            found = False
+            # 尝试随机寻找一个存在触发变量异常的时间窗口
+            for _ in range(100):
+                w_idx = np.random.randint(len(self.scores))
+                win_scores = self.scores[w_idx]
+                candidates = []
+                for t_name in config.TRIGGER_SENSORS:
+                    if t_name in self.map:
+                        idx = self.map[t_name]
+                        # 只有当诱发变量得分超过触发阈值,才将其作为候选起点
+                        if win_scores[idx] > config.TRIGGER_SCORE_THRESH:
+                            candidates.append(t_name)
+                if candidates:
+                    self.current_window_idx = w_idx
+                    t_name = np.random.choice(candidates)
+                    found = True
+                    break
+            if not found:
+                self.current_window_idx = np.random.randint(len(self.scores))
+                t_name = list(self.map.keys())[0]
+                
+        # 初始化路径状态
+        self.current_node_idx = self.map.get(t_name, 0)
+        self.trigger_node_idx = self.current_node_idx
+        self.prev_node_idx = self.current_node_idx
+        self.path = [self.current_node_idx]
+        
+        # 加载对应的专家知识作为本回合的目标(用于计算奖励)
+        self.target_roots = set()
+        self.current_expert_paths = []
+        if self.current_node_idx in self.expert_knowledge:
+            entry = self.expert_knowledge[self.current_node_idx]
+            self.target_roots = entry['roots']
+            self.current_expert_paths = entry['paths']
+            
+        return self._get_state()
+    
+    def _get_state(self):
+        """
+        获取当前状态观测值 (Observation)。
+        将离散的 ID 信息与连续的异常分数/层级信息打包,供神经网络提取特征。
+        """
+        curr_score = self.scores[self.current_window_idx, self.current_node_idx]
+        prev_score = self.scores[self.current_window_idx, self.prev_node_idx]
+        curr_layer = self.node_props[self.current_node_idx]['one_layer'] / 20.0
+        
+        return (
+            torch.LongTensor([self.current_node_idx, self.prev_node_idx, self.trigger_node_idx]), 
+            torch.FloatTensor([curr_score, prev_score, curr_layer])
+        )
+    
+    def get_valid_actions(self, curr_idx):
+        """
+        动作掩码 (Action Masking) 机制。
+        根据因果图和业务规则,告诉 AI 当前这一步可以走向哪些邻居节点。
+        """
+        # 从邻接矩阵获取物理相邻的节点
+        neighbors = np.where(self.adj[curr_idx] == 1)[0]
+        curr_props = self.node_props[curr_idx]
+        curr_l, curr_d = curr_props['one_layer'], curr_props['device']
+        valid = []
+        for n in neighbors:
+            if n in self.path: continue 
+            tgt_props = self.node_props[n]
+            tgt_l, tgt_d = tgt_props['one_layer'], tgt_props['device']
+            
+            # 双重保险:再次校验层级和设备约束
+            if curr_l != 0 and tgt_l != 0:
+                if not ((tgt_l == curr_l) or (tgt_l == curr_l - 1)): continue
+            if (curr_d is not None) and (tgt_d is not None):
+                if curr_d != tgt_d: continue
+            valid.append(n)
+        return np.array(valid)
+    
+    def step(self, action_idx):
+        """
+        AI 执行一步动作,环境返回新的状态和获得的奖励 (Reward)。
+        奖励函数 (Reward Function) 是整个 AI 的价值观,决定了它的行为倾向。
+        """
+        prev = self.current_node_idx
+        self.prev_node_idx = prev
+        self.current_node_idx = action_idx
+        self.path.append(action_idx)
+        
+        score_curr = self.scores[self.current_window_idx, self.current_node_idx]
+        
+        reward = 0.0
+        done = False
+        
+        # [奖励 1:模仿专家经验] 
+        # 如果走到了历史记录过的异常节点上,给予正反馈
+        in_expert_nodes = False
+        for e_path in self.current_expert_paths:
+            if action_idx in e_path:
+                in_expert_nodes = True
+                break
+        
+        if in_expert_nodes: reward += 2.0
+        else: reward -= 0.2    # 探索未知节点的轻微惩罚,避免随意瞎走
+            
+        # [奖励 2:命中最终根因] 
+        # 成功找到了真正的罪魁祸首,给予巨额奖励并结束本回合
+        if action_idx in self.target_roots:
+            reward += 10.0
+            done = True
+        
+        # [奖励 3:异常梯度导向] 
+        # 鼓励 AI 顺着异常分越来越高的方向走(异常传导衰减原理)
+        score_prev = self.scores[self.current_window_idx, prev]
+        diff = score_curr - score_prev
+        if diff > 0: reward += diff * 3.0
+        else: reward -= 0.5
+            
+        # [惩罚 1:路径过长] 防止发散
+        if len(self.path) >= config.MAX_PATH_LENGTH:
+            done = True
+            if action_idx not in self.target_roots: reward -= 5.0
+            
+        # [惩罚 2:走入正常区域] 如果走到了异常分很低的节点,说明找错方向了    
+        if score_curr < 0.15 and len(self.path) > 3:
+            done = True
+            reward -= 2.0
+            
+        return self._get_state(), reward, done, {}
+
+# ----------------- 2. 神经网络架构 (Actor-Critic) -----------------
+class TargetDrivenActorCritic(nn.Module):
+    """
+    智能体的“大脑”,采用 Actor-Critic 双头输出架构。
+    Actor 负责决定“下一步去哪”(策略),Critic 负责评估“当前局势有多好”(价值)。
+    """
+    def __init__(self, num_sensors, embedding_dim=64, hidden_dim=256):
+        super().__init__()
+        self.node_emb = nn.Embedding(num_sensors, embedding_dim)
+        input_dim = (embedding_dim * 3) + 3 
+        
+        self.shared_net = nn.Sequential(
+            nn.Linear(input_dim, hidden_dim),
+            nn.ReLU(),
+            nn.LayerNorm(hidden_dim),
+            nn.Linear(hidden_dim, hidden_dim),
+            nn.ReLU()
+        )
+        self.actor = nn.Linear(hidden_dim, num_sensors)
+        self.critic = nn.Linear(hidden_dim, 1)
+        
+    def forward(self, int_data, float_data):
+        curr_emb = self.node_emb(int_data[:, 0])
+        prev_emb = self.node_emb(int_data[:, 1])
+        trig_emb = self.node_emb(int_data[:, 2]) 
+        x = torch.cat([curr_emb, prev_emb, trig_emb, float_data], dim=1)
+        feat = self.shared_net(x)
+        return self.actor(feat), self.critic(feat)
+
+# ----------------- 3. 训练器 -----------------
+class RLTrainer:
+    def __init__(self, causal_graph, train_scores, threshold_df):
+        self.sensor_map = causal_graph['sensor_to_idx']
+        self.idx_to_sensor = {v: k for k, v in self.sensor_map.items()}
+        self.threshold_df = threshold_df
+        self.causal_graph = causal_graph
+        self.expert_knowledge, self.bc_samples, _ = self._load_expert_data()
+        
+        self.env = CausalTracingEnv(causal_graph, train_scores, threshold_df, self.expert_knowledge)
+        self.model = TargetDrivenActorCritic(self.env.num_sensors, config.EMBEDDING_DIM, config.HIDDEN_DIM)
+        self.optimizer = optim.Adam(self.model.parameters(), lr=config.PPO_LR)
+        
+    def _load_expert_data(self):
+        path = config.ABNORMAL_LINK_FILENAME
+        kb_data = {} 
+        bc_data = [] 
+        if not os.path.exists(path): return kb_data, bc_data, None
+        
+        df = pd.read_excel(path)
+        for _, row in df.iterrows():
+            link = str(row.get('Link Path', ''))
+            if not link: continue
+            nodes_str = [n.strip() for n in link.replace('→', '->').split('->')]
+            path_nodes = nodes_str[::-1]
+            
+            ids = []
+            valid = True
+            for n in path_nodes:
+                if n in self.sensor_map: ids.append(self.sensor_map[n])
+                else: valid = False; break
+            if not valid or len(ids)<2: continue
+            
+            trigger_id = ids[0]
+            root_id = ids[-1]
+            
+            if trigger_id not in kb_data:
+                kb_data[trigger_id] = {'paths': [], 'roots': set(), 'logic': row.get('Process Logic Basis', '')}
+            kb_data[trigger_id]['paths'].append(ids)
+            kb_data[trigger_id]['roots'].add(root_id)
+            
+            for i in range(len(ids) - 1):
+                curr = ids[i]
+                prev = ids[max(0, i-1)]
+                nxt = ids[i+1]
+                bc_data.append(((curr, prev, trigger_id), nxt))
+                
+        return kb_data, bc_data, df
+
+    def pretrain_bc(self):
+        """
+        第一阶段:行为克隆 (Behavior Cloning) 预训练。
+        相当于给 AI 上课死记硬背专家已知的异常链路,让它具备基础的业务常识。
+        采用标准的监督学习交叉熵损失。
+        """
+        if not self.bc_samples: return
+        print(f"\n>>> [Step 3.1] 启动BC预训练 ({config.BC_EPOCHS}轮)...")
+        states_int = torch.LongTensor([list(s) for s, a in self.bc_samples])
+        actions = torch.LongTensor([a for s, a in self.bc_samples])
+        states_float = torch.zeros((len(states_int), 3))
+        states_float[:, 0] = 0.9 
+        states_float[:, 1] = 0.8
+        
+        loss_fn = nn.CrossEntropyLoss()
+        pbar = tqdm(range(config.BC_EPOCHS), desc="BC Training")
+        for epoch in pbar:
+            logits, _ = self.model(states_int, states_float)
+            loss = loss_fn(logits, actions)
+            self.optimizer.zero_grad()
+            loss.backward()
+            self.optimizer.step()
+            if epoch%100==0: pbar.set_postfix({'Loss': f"{loss.item():.4f}"})
+
+    def train_ppo(self):
+        """
+        第二阶段:PPO 强化学习自主探索。
+        AI 在具有不同异常分数分布的真实数据环境中不断试错,
+        发现专家库中未登记的新的潜在异常链路。
+        """
+        print(f"\n>>> [Step 3.2] 启动PPO训练 ({config.RL_EPISODES}轮)...")
+        pbar = tqdm(range(config.RL_EPISODES), desc="PPO Training")
+        rewards_hist = []
+        for _ in pbar:
+            state_data = self.env.reset()
+            done = False
+            ep_r = 0
+            b_int, b_float, b_act, b_lp, b_rew, b_mask = [], [], [], [], [], []
+            
+            while not done:
+                s_int = state_data[0].unsqueeze(0)
+                s_float = state_data[1].unsqueeze(0)
+                valid = self.env.get_valid_actions(s_int[0, 0].item())
+                if len(valid) == 0: break
+                
+                logits, _ = self.model(s_int, s_float)
+                mask = torch.full_like(logits, -1e9)
+                mask[0, valid] = 0
+                dist = Categorical(F.softmax(logits+mask, dim=-1))
+                action = dist.sample()
+                
+                next_s, r, done, _ = self.env.step(action.item())
+                b_int.append(s_int); b_float.append(s_float)
+                b_act.append(action); b_lp.append(dist.log_prob(action))
+                b_rew.append(r); b_mask.append(1-done)
+                state_data = next_s
+                ep_r += r
+            
+            if len(b_rew) > 1:
+                self._update_ppo(b_int, b_float, b_act, b_lp, b_rew, b_mask)
+            
+            rewards_hist.append(ep_r)
+            if len(rewards_hist)>50: rewards_hist.pop(0)
+            pbar.set_postfix({'AvgR': f"{np.mean(rewards_hist):.2f}"})
+
+    def _update_ppo(self, b_int, b_float, b_act, b_lp, b_rew, b_mask):
+        """PPO 核心公式计算:折扣回报计算、优势函数、Clip 截断防止策略更新幅度过大"""
+        returns = []
+        R = 0
+        for r, m in zip(reversed(b_rew), reversed(b_mask)):
+            R = r + config.PPO_GAMMA * R * m
+            returns.insert(0, R)
+        returns = torch.tensor(returns)
+        
+        if returns.numel() > 1 and returns.std() > 1e-5:
+            returns = (returns - returns.mean()) / (returns.std() + 1e-5)
+        elif returns.numel() > 1:
+             returns = returns - returns.mean()
+        
+        s_int = torch.cat(b_int)
+        s_float = torch.cat(b_float)
+        act = torch.stack(b_act)
+        old_lp = torch.stack(b_lp).detach()
+        
+        for _ in range(config.PPO_K_EPOCHS):
+            logits, vals = self.model(s_int, s_float)
+            dist = Categorical(logits=logits)
+            new_lp = dist.log_prob(act)
+            ratio = torch.exp(new_lp - old_lp)
+            
+            surr1 = ratio * returns
+            surr2 = torch.clamp(ratio, 1-config.PPO_EPS_CLIP, 1+config.PPO_EPS_CLIP) * returns
+            
+            v_pred = vals.squeeze()
+            if v_pred.shape != returns.shape:
+                v_pred = v_pred.view(-1)
+                returns = returns.view(-1)
+                
+            loss = -torch.min(surr1, surr2).mean() + 0.5 * F.mse_loss(v_pred, returns)
+            self.optimizer.zero_grad()
+            loss.backward()
+            self.optimizer.step()
+
+    def evaluate(self, test_scores):
+        """
+        第四步:模型验证与评估。
+        使用未见过的测试集数据让 AI 跑全流程,评估诊断准确率和新模式发现能力。
+        并将结果导出为结构化的 Excel 评估报告。
+        """
+        print("\n>>> [Step 4] 评估测试集...")
+        self.model.eval()
+        results = []
+        
+        cnt_detected = 0
+        cnt_kb_covered = 0
+        cnt_path_match = 0
+        cnt_root_match = 0
+        cnt_new = 0
+        
+        env = CausalTracingEnv(self.causal_graph, test_scores, self.threshold_df, self.expert_knowledge)
+        
+        for win_idx in range(len(test_scores)):
+            scores = test_scores[win_idx]
+            active = []
+            for t_name in config.TRIGGER_SENSORS:
+                if t_name in self.sensor_map:
+                    idx = self.sensor_map[t_name]
+                    if scores[idx] > config.TRIGGER_SCORE_THRESH:
+                        active.append((t_name, idx))
+            
+            for t_name, t_idx in active:
+                cnt_detected += 1
+                state_data = env.reset(force_window_idx=win_idx, force_trigger=t_name)
+                path_idxs = [t_idx]
+                done = False
+                
+                while not done:
+                    s_int = state_data[0].unsqueeze(0)
+                    s_float = state_data[1].unsqueeze(0)
+                    valid = env.get_valid_actions(path_idxs[-1])
+                    if len(valid) == 0: break
+                    
+                    logits, _ = self.model(s_int, s_float)
+                    mask = torch.full_like(logits, -1e9)
+                    mask[0, valid] = 0
+                    act = torch.argmax(logits + mask, dim=1).item()
+                    state_data, _, done, _ = env.step(act)
+                    path_idxs.append(act)
+                    if len(path_idxs) >= config.MAX_PATH_LENGTH: done = True
+                
+                path_names = [self.idx_to_sensor[i] for i in path_idxs]
+                root = path_names[-1]
+                root_score = scores[self.sensor_map[root]]
+                
+                match_status = "未定义"
+                logic = ""
+                
+                if t_idx in self.expert_knowledge:
+                    cnt_kb_covered += 1
+                    entry = self.expert_knowledge[t_idx]
+                    logic = entry.get('logic', '')
+                    
+                    real_roots = [self.idx_to_sensor[r] for r in entry['roots']]
+                    rm = False
+                    for p_node in path_names:
+                        if p_node in real_roots:
+                            rm = True
+                            break
+                    
+                    pm = False
+                    path_set = set(path_idxs)
+                    for exp_p in entry['paths']:
+                        exp_set = set(exp_p)
+                        intersection = len(path_set.intersection(exp_set))
+                        union = len(path_set.union(exp_set))
+                        if union > 0 and (intersection / union) >= 0.6:
+                            pm = True
+                            break
+                    
+                    if pm:
+                        match_status = "路径吻合"
+                        cnt_path_match += 1
+                        cnt_root_match += 1
+                    elif rm:
+                        match_status = "仅根因吻合"
+                        cnt_root_match += 1
+                    else:
+                        match_status = "不吻合"
+                else:
+                    match_status = "新链路"
+                    cnt_new += 1
+                
+                results.append({
+                    "窗口ID": win_idx,
+                    "诱发变量": t_name,
+                    "溯源路径": "->".join(path_names),
+                    "根因变量": root,
+                    "根因异常分": f"{root_score:.3f}",
+                    "是否知识库": "是" if t_idx in self.expert_knowledge else "否",
+                    "匹配情况": match_status,
+                    "机理描述": logic
+                })
+
+        denom = max(cnt_kb_covered, 1)
+        summary = [
+            {"指标": "检测到的总异常样本数", "数值": cnt_detected},
+            {"指标": "知识库覆盖的样本数", "数值": cnt_kb_covered},
+            {"指标": "异常链路准确率", "数值": f"{cnt_path_match/denom:.2%}"},
+            {"指标": "根因准确率", "数值": f"{cnt_root_match/denom:.2%}"},
+            {"指标": "新发现异常模式数", "数值": cnt_new}
+        ]
+        
+        save_path = f"{config.RESULT_SAVE_DIR}/{config.TEST_RESULT_FILENAME}"
+        with pd.ExcelWriter(save_path, engine='openpyxl') as writer:
+            pd.DataFrame(summary).to_excel(writer, sheet_name='Sheet1_概览指标', index=False)
+            pd.DataFrame(results).to_excel(writer, sheet_name='Sheet2_测试集详情', index=False)
+            
+        print("\n" + "="*50)
+        print(pd.DataFrame(summary).to_string(index=False))
+        print(f"\n文件已保存: {save_path}")
+        print("="*50)
+
+    def save_model(self):
+        path = config.MODEL_FILE_PATH
+        torch.save(self.model.state_dict(), path)

+ 296 - 0
models/Dynamic_anomaly_diagnosis/test.py

@@ -0,0 +1,296 @@
+# -*- coding: utf-8 -*-
+"""test.py: 在线诊断接口"""
+import os
+import pandas as pd
+import numpy as np
+import torch
+import argparse
+from config import config
+
+class WaterPlantDiagnoser:
+    def __init__(self):
+        from data_processing import DataAnomalyProcessor
+        from causal_structure import CausalStructureBuilder
+        from rl_tracing import RLTrainer, CausalTracingEnv
+        
+        self.processor = DataAnomalyProcessor()
+        self.sensor_list = self.processor.sensor_list
+        self.threshold_df = self.processor.threshold_df
+        
+        self.builder = CausalStructureBuilder(self.threshold_df)
+        self.causal_graph = self.builder.build()
+        
+        dummy_scores = np.zeros((1, len(self.sensor_list)), dtype=np.float32)
+        self.trainer = RLTrainer(self.causal_graph, dummy_scores, self.threshold_df)
+        self.CausalTracingEnv = CausalTracingEnv # 缓存类供后面使用
+        
+        # 直接使用 config 里面拼好的路径
+        if not os.path.exists(config.MODEL_FILE_PATH):
+            print(f"[Warning] 未找到模型文件: {config.MODEL_FILE_PATH}。请先执行 main.py 进行训练。")
+        else:
+            state_dict = torch.load(config.MODEL_FILE_PATH, map_location=torch.device('cpu'), weights_only=True)
+            self.trainer.model.load_state_dict(state_dict)
+            print(f"[*] 成功加载模型: {config.MODEL_FILE_PATH}")
+        
+        self.trainer.model.eval()
+        
+    def _preprocess_dataframe(self, df_input):
+        """
+        严格复现数据处理逻辑
+        """
+        try:
+            df = df_input.copy()
+            # 1. 时间解析
+            time_col = df.columns[0]
+            df[time_col] = pd.to_datetime(df[time_col], format='mixed', errors='coerce')
+            df = df.dropna(subset=[time_col]).set_index(time_col)
+            
+            # 2. 筛选列
+            valid_cols = [c for c in df.columns if c in self.sensor_list]
+            if not valid_cols: return None
+            df_valid = df[valid_cols]
+            
+            # 3. 降采样 (4s -> 20s)
+            df_resampled = df_valid.resample(f"{config.TARGET_SAMPLE_INTERVAL}s").mean()
+            df_resampled = df_resampled.astype(np.float32)
+            
+            return df_resampled
+        except Exception as e:
+            print(f"数据预处理错误: {e}")
+            return None
+
+    def api_predict(self, df_raw):
+        """
+        对外接口: 每次输入过去2小时的数据,取最新的40分钟进行诊断
+        """
+        # --- Layer 1: 数据预处理 ---
+        df_resampled = self._preprocess_dataframe(df_raw)
+        if df_resampled is None or df_resampled.empty:
+            return {"status": "error", "msg": "数据预处理失败或数据为空"}
+            
+        # --- Layer 1: 异常得分计算 (包含绝对+MAD综合得分) ---
+        scores_dict = {}
+        for sensor in self.sensor_list:
+            if sensor in df_resampled.columns:
+                scores_dict[sensor] = self.processor._calculate_point_score_vectorized(
+                    df_resampled[sensor], sensor
+                )
+            else:
+                # 严格对齐: 使用 np.nan 保持与训练完全一致
+                scores_dict[sensor] = pd.Series(np.nan, index=df_resampled.index, dtype=np.float32)
+        
+        point_score_df = pd.DataFrame(scores_dict)[self.sensor_list]
+        
+        # --- Layer 1: 提取最新40分钟进行窗口聚合与原始数据均值计算 ---
+        req_points = config.POINTS_PER_WINDOW  # 40分钟 = 120个点 (20s间隔)
+        total_points = len(point_score_df)
+        
+        if total_points < req_points:
+            return {
+                "status": "warning", 
+                "message": f"传入数据不足40分钟 (当前: {total_points*20}s, 需要: {req_points*20}s)"
+            }
+            
+        # 切片:取最后40分钟的数据 (用于计算异常分) 和 原始数据 (用于计算偏差百分比)
+        latest_40min_scores = point_score_df.iloc[-req_points:].values
+        latest_40min_raw = df_resampled.iloc[-req_points:]
+        # 计算这40分钟内原始传感器的均值,作为业务展示的参考值
+        real_time_values = latest_40min_raw.mean(skipna=True)
+        
+        # 计算有效比例和 95 分位数
+        valid_counts = np.sum(~np.isnan(latest_40min_scores), axis=0)
+        valid_ratios = valid_counts / req_points
+        
+        current_window_scores = np.zeros(latest_40min_scores.shape[1], dtype=np.float32)
+        valid_mask = valid_ratios >= config.VALID_DATA_RATIO
+        
+        if np.any(valid_mask):
+            current_window_scores[valid_mask] = np.nanquantile(latest_40min_scores[:, valid_mask], 0.95, axis=0)
+        
+        current_window_scores = np.clip(current_window_scores, 0, 1)
+        
+        # --- Layer 2 & 3: 触发检测与溯源 ---
+        results = []
+        active_triggers = []
+        
+        for t_name in config.TRIGGER_SENSORS:
+            if t_name in self.causal_graph['sensor_to_idx']:
+                idx = self.causal_graph['sensor_to_idx'][t_name]
+                score = current_window_scores[idx]
+                if score > config.TRIGGER_SCORE_THRESH:
+                    active_triggers.append((t_name, idx, score))
+
+        if not active_triggers:
+            return {
+                "status": "normal", 
+                "message": "系统运行正常", 
+            }
+
+        # 构建临时环境进行溯源
+        env_scores = current_window_scores.reshape(1, -1)
+        temp_env = self.CausalTracingEnv(self.causal_graph, env_scores, self.threshold_df, self.trainer.expert_knowledge)
+        
+        for t_name, t_idx, t_score in active_triggers:
+            state_data = temp_env.reset(force_window_idx=0, force_trigger=t_name)
+            path_idxs = [t_idx]
+            done = False
+            
+            while not done:
+                s_int = state_data[0].unsqueeze(0)
+                s_float = state_data[1].unsqueeze(0)
+                valid = temp_env.get_valid_actions(path_idxs[-1])
+                
+                if len(valid) == 0: break
+                
+                logits, _ = self.trainer.model(s_int, s_float)
+                mask = torch.full_like(logits, -1e9)
+                mask[0, valid] = 0
+                act = torch.argmax(logits + mask, dim=1).item()
+                
+                state_data, _, done, _ = temp_env.step(act)
+                path_idxs.append(act)
+                if len(path_idxs) >= config.MAX_PATH_LENGTH: done = True
+            
+            path_names = [self.trainer.idx_to_sensor[i] for i in path_idxs]
+            
+            # --- 1 & 2:计算链路有效性与双重偏差量化 ---
+            path_details = []
+            abnormal_other_nodes_count = 0
+            
+            for node_idx, node_name in zip(path_idxs, path_names):
+                node_score = current_window_scores[node_idx]
+                is_node_abnormal = node_score > config.WINDOW_ANOMALY_THRESHOLD
+                
+                # 若不是诱发变量本身且达到异常标准,计数+1
+                if node_idx != t_idx and is_node_abnormal:
+                    abnormal_other_nodes_count += 1
+                
+                # 1. 获取当前参考值
+                raw_val = real_time_values.get(node_name, np.nan)
+                
+                # 2. 实时回溯计算该节点的动态基线与上下限
+                dyn_lower, dyn_upper, dyn_med = np.nan, np.nan, np.nan
+                if node_name in df_resampled.columns:
+                    vals = df_resampled[node_name].astype(np.float32)
+                    # 模拟滑动窗口还原 MAD 的历史状态
+                    r_med = vals.rolling(window=config.MAD_HISTORY_WINDOW, min_periods=1).median()
+                    r_mad = (vals - r_med).abs().rolling(window=config.MAD_HISTORY_WINDOW, min_periods=1).median()
+                    
+                    # 提取时刻末尾的值作为基线
+                    last_med = r_med.iloc[-1]
+                    last_mad = r_mad.iloc[-1]
+                    dyn_lower = last_med - config.MAD_THRESHOLD * last_mad
+                    dyn_upper = last_med + config.MAD_THRESHOLD * last_mad
+                    dyn_med = last_med
+
+                # 3. 分别构建【物理范围】与【动态范围】的显示文字
+                phy_str = "未定义"
+                dyn_str = "未定义"
+                node_name_zh = "未知"
+                
+                # 提取中文名称
+                if node_name in self.processor.threshold_dict:
+                    info = self.processor.threshold_dict[node_name]
+                    node_name_zh = info.get('Name', '未知')
+                
+                if not pd.isna(raw_val) and node_name in self.processor.threshold_dict:
+                    info = self.processor.threshold_dict[node_name]
+                    g_min, g_max = info['Good_min'], info['Good_max']
+                    
+                    # -- 物理范围判定 --
+                    if raw_val > g_max and g_max != np.inf:
+                        pct = (raw_val - g_max) / (abs(g_max) + 1e-5) * 100
+                        phy_str = f"偏高 {pct:.1f}% (物理允许上限: {g_max})"
+                    elif raw_val < g_min and g_min != -np.inf:
+                        pct = (g_min - raw_val) / (abs(g_min) + 1e-5) * 100
+                        phy_str = f"偏低 {pct:.1f}% (物理允许下限: {g_min})"
+                    else:
+                        phy_str = f"正常 (物理范围: [{g_min}, {g_max}])"
+                        
+                    # -- 动态范围判定 --
+                    if not pd.isna(dyn_lower) and not pd.isna(dyn_upper):
+                        if raw_val > dyn_upper:
+                            dyn_str = f"异常突增 (近期基线: {dyn_med:.2f}, 动态上限: {dyn_upper:.2f})"
+                        elif raw_val < dyn_lower:
+                            dyn_str = f"异常突降 (近期基线: {dyn_med:.2f}, 动态下限: {dyn_lower:.2f})"
+                        else:
+                            dyn_str = f"平稳波动 (近期基线: {dyn_med:.2f}, 动态区间: [{dyn_lower:.2f}, {dyn_upper:.2f}])"
+
+                # 组装为清晰的结构化文本
+                dev_str = f"当前值: {raw_val:.2f} | 物理工况: {phy_str} | 动态趋势: {dyn_str}"
+
+                path_details.append({
+                    "node": node_name,
+                    "name": node_name_zh, 
+                    "anomaly_score": round(float(node_score), 4),
+                    "is_abnormal": bool(is_node_abnormal),
+                    "deviation": dev_str
+                })
+            
+            #  除诱发变量外,只要有 >= 1 个节点异常即触发溯源报警
+            if abnormal_other_nodes_count >= 1:
+                results.append({
+                    "trigger": t_name,
+                    "path": " -> ".join(path_names),
+                    "root_cause": path_names[-1],
+                    "details": path_details
+                })
+            
+        # 若所有触发链路均不满足 >= 1 个额外异常节点的条件
+        if not results:
+            return {
+                "status": "normal",
+                "message": "系统运行正常",
+            }
+            
+        return {
+            "status": "abnormal",
+            "results": results
+        }
+
+
+if __name__ == "__main__":
+    
+    # 模拟外部调用机制:每次固定传 2 小时的数据
+    parser = argparse.ArgumentParser(description="水厂在线诊断测试")
+    parser.add_argument('-p', '--plant', type=str, required=True, help="测试的水厂名称")
+    args = parser.parse_args()
+    
+    config.load(args.plant)
+    diagnoser = WaterPlantDiagnoser()
+    
+    test_file = os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}1.csv")
+    
+    if os.path.exists(test_file):
+        print(">>> 正在启动在线诊断引擎测试 (模式:读2小时,查末尾40分钟)...")
+        
+        # 2小时 = 7200秒 = 1800 行原始数据
+        CHUNK_SIZE = 1800 
+        
+        df_full = pd.read_csv(test_file, low_memory=False)
+        
+        if len(df_full) >= CHUNK_SIZE * 3:
+            import json
+            
+            # 模拟 1:传入 0~2 小时的数据
+            df_sim1 = df_full.iloc[0 : CHUNK_SIZE]
+            print("\n[测试 1] 传入 0~2 小时数据:")
+            print(json.dumps(diagnoser.api_predict(df_sim1), ensure_ascii=False, indent=2))
+            
+            # 模拟 2:传入 2~4 小时的数据
+            df_sim2 = df_full.iloc[CHUNK_SIZE : CHUNK_SIZE * 2]
+            print("\n[测试 2] 传入 2~4 小时数据:")
+            print(json.dumps(diagnoser.api_predict(df_sim2), ensure_ascii=False, indent=2))
+            
+            # 模拟 3:传入末尾 2 小时的数据
+            df_sim3 = df_full.iloc[-CHUNK_SIZE:]
+            print("\n[测试 3] 传入 4~6 小时数据:")
+            print(json.dumps(diagnoser.api_predict(df_sim3), ensure_ascii=False, indent=2))
+            
+        else:
+            import json
+            print("\n数据量不足以做三次两小时模拟,直接进行单次全量模拟:")
+            print(json.dumps(diagnoser.api_predict(df_full), ensure_ascii=False, indent=2))
+            
+    else:
+        print(f"测试文件 {test_file} 不存在,无法执行本地测试。")

BIN
models/Dynamic_anomaly_diagnosis/xishan/abnormal_link.xlsx


+ 67 - 0
models/Dynamic_anomaly_diagnosis/xishan/config.yaml

@@ -0,0 +1,67 @@
+# xishan/config.yaml
+project:
+  plant_name: "xishan"
+
+files:
+  # 纯文件名,系统会自动在 ./xishan/ 下寻找
+  threshold_filename: "sensor_threshold.xlsx"
+  abnormal_link_filename: "abnormal_link.xlsx"
+  model_filename: "ppo_tracing_model.pth"
+  test_result_filename: "Final_Test_Report.xlsx"
+  sensor_file_prefix: "data_process_"
+
+data_processing:
+  sensor_file_num_range: [1, 119]
+  original_sample_interval: 4
+  target_sample_interval: 20
+  window_duration_min: 40
+  valid_data_ratio: 0.6
+  window_anomaly_threshold: 0.2
+  train_test_split: 0.8
+  trigger_score_thresh: 0.5
+  absolute_score_weight: 0.6
+  dynamic_score_weight: 0.4
+  mad_history_window: 360
+  mad_threshold: 3.0
+
+sensors:
+  keyword_layer: "One_layer"
+  keyword_device: "Device"
+  trigger_sensors:
+    - "UF1Per"
+    - "UF2Per"
+    - "UF3Per"
+    - "UF4Per"
+    - "C.M.RO1_DB@DPT_1"
+    - "C.M.RO2_DB@DPT_1"
+    - "C.M.RO3_DB@DPT_1"
+    - "C.M.RO4_DB@DPT_1"
+    - "C.M.RO1_DB@DPT_2"
+    - "C.M.RO2_DB@DPT_2"
+    - "C.M.RO3_DB@DPT_2"
+    - "C.M.RO4_DB@DPT_2"
+    - "RO1_CSFlow"
+    - "RO2_CSFlow"
+    - "RO3_CSFlow"
+    - "RO4_CSFlow"
+    - "RO1HSL"
+    - "RO2HSL"
+    - "RO3HSL"
+    - "RO4HSL"
+    - "RO1_TYL"
+    - "RO2_TYL"
+    - "RO3_TYL"
+    - "RO4_TYL"
+
+rl_params:
+  min_path_length: 3
+  max_path_length: 6
+  embedding_dim: 64
+  hidden_dim: 256
+  ppo_lr: 0.0003
+  ppo_gamma: 0.90
+  ppo_eps_clip: 0.2
+  ppo_k_epochs: 10
+  ppo_batch_size: 64
+  bc_epochs: 20000
+  rl_episodes: 20000

BIN
models/Dynamic_anomaly_diagnosis/xishan/ppo_tracing_model.pth


BIN
models/Dynamic_anomaly_diagnosis/xishan/sensor_threshold.xlsx