# -*- coding: utf-8 -*- """test.py: 在线诊断接口""" import os import pandas as pd import numpy as np import torch from config import config from data_processing import DataAnomalyProcessor from causal_structure import CausalStructureBuilder from rl_tracing import RLTrainer, CausalTracingEnv class WaterPlantDiagnoser: def __init__(self): # 1. 初始化数据处理器 (用于加载阈值和计算得分,异常表征逻辑与训练完全一致) self.processor = DataAnomalyProcessor() self.sensor_list = self.processor.sensor_list self.threshold_df = self.processor.threshold_df # 2. 构建因果图 (Layer 2) self.builder = CausalStructureBuilder(self.threshold_df) self.causal_graph = self.builder.build() # 3. 加载强化学习模型 (Layer 3) dummy_scores = np.zeros((1, len(self.sensor_list)), dtype=np.float32) self.trainer = RLTrainer(self.causal_graph, dummy_scores, self.threshold_df) model_path = os.path.join(config.MODEL_SAVE_DIR, "ppo_tracing_model.pth") if not os.path.exists(model_path): print(f"[Warning] 未找到模型文件: {model_path}。如果是测试环境,请确保已有预训练模型。") else: state_dict = torch.load(model_path, map_location=torch.device('cpu'), weights_only=True) self.trainer.model.load_state_dict(state_dict) self.trainer.model.eval() def _preprocess_dataframe(self, df_input): """ 严格复现数据处理逻辑 """ try: df = df_input.copy() # 1. 时间解析 time_col = df.columns[0] df[time_col] = pd.to_datetime(df[time_col], format='mixed', errors='coerce') df = df.dropna(subset=[time_col]).set_index(time_col) # 2. 筛选列 valid_cols = [c for c in df.columns if c in self.sensor_list] if not valid_cols: return None df_valid = df[valid_cols] # 3. 降采样 (4s -> 20s) df_resampled = df_valid.resample(f"{config.TARGET_SAMPLE_INTERVAL}s").mean() df_resampled = df_resampled.astype(np.float32) return df_resampled except Exception as e: print(f"数据预处理错误: {e}") return None def api_predict(self, df_raw): """ 对外接口: 每次输入过去2小时的数据,取最新的40分钟进行诊断 """ # --- Layer 1: 数据预处理 --- df_resampled = self._preprocess_dataframe(df_raw) if df_resampled is None or df_resampled.empty: return {"status": "error", "msg": "数据预处理失败或数据为空"} # --- Layer 1: 异常得分计算 (包含绝对+MAD综合得分) --- scores_dict = {} for sensor in self.sensor_list: if sensor in df_resampled.columns: scores_dict[sensor] = self.processor._calculate_point_score_vectorized( df_resampled[sensor], sensor ) else: # 严格对齐: 使用 np.nan 保持与训练完全一致 scores_dict[sensor] = pd.Series(np.nan, index=df_resampled.index, dtype=np.float32) point_score_df = pd.DataFrame(scores_dict)[self.sensor_list] # --- Layer 1: 提取最新40分钟进行窗口聚合与原始数据均值计算 --- req_points = config.POINTS_PER_WINDOW # 40分钟 = 120个点 (20s间隔) total_points = len(point_score_df) if total_points < req_points: return { "status": "warning", "message": f"传入数据不足40分钟 (当前: {total_points*20}s, 需要: {req_points*20}s)" } # 切片:取最后40分钟的数据 (用于计算异常分) 和 原始数据 (用于计算偏差百分比) latest_40min_scores = point_score_df.iloc[-req_points:].values latest_40min_raw = df_resampled.iloc[-req_points:] # 计算这40分钟内原始传感器的均值,作为业务展示的参考值 real_time_values = latest_40min_raw.mean(skipna=True) # 计算有效比例和 95 分位数 valid_counts = np.sum(~np.isnan(latest_40min_scores), axis=0) valid_ratios = valid_counts / req_points current_window_scores = np.zeros(latest_40min_scores.shape[1], dtype=np.float32) valid_mask = valid_ratios >= config.VALID_DATA_RATIO if np.any(valid_mask): current_window_scores[valid_mask] = np.nanquantile(latest_40min_scores[:, valid_mask], 0.95, axis=0) current_window_scores = np.clip(current_window_scores, 0, 1) # --- Layer 2 & 3: 触发检测与溯源 --- results = [] active_triggers = [] for t_name in config.TRIGGER_SENSORS: if t_name in self.causal_graph['sensor_to_idx']: idx = self.causal_graph['sensor_to_idx'][t_name] score = current_window_scores[idx] if score > config.TRIGGER_SCORE_THRESH: active_triggers.append((t_name, idx, score)) if not active_triggers: return { "status": "normal", "message": "系统运行正常", } # 构建临时环境进行溯源 env_scores = current_window_scores.reshape(1, -1) temp_env = CausalTracingEnv(self.causal_graph, env_scores, self.threshold_df, self.trainer.expert_knowledge) for t_name, t_idx, t_score in active_triggers: state_data = temp_env.reset(force_window_idx=0, force_trigger=t_name) path_idxs = [t_idx] done = False while not done: s_int = state_data[0].unsqueeze(0) s_float = state_data[1].unsqueeze(0) valid = temp_env.get_valid_actions(path_idxs[-1]) if len(valid) == 0: break logits, _ = self.trainer.model(s_int, s_float) mask = torch.full_like(logits, -1e9) mask[0, valid] = 0 act = torch.argmax(logits + mask, dim=1).item() state_data, _, done, _ = temp_env.step(act) path_idxs.append(act) if len(path_idxs) >= config.MAX_PATH_LENGTH: done = True path_names = [self.trainer.idx_to_sensor[i] for i in path_idxs] # --- 1 & 2:计算链路有效性与双重偏差量化 --- path_details = [] abnormal_other_nodes_count = 0 for node_idx, node_name in zip(path_idxs, path_names): node_score = current_window_scores[node_idx] is_node_abnormal = node_score > config.WINDOW_ANOMALY_THRESHOLD # 若不是诱发变量本身且达到异常标准,计数+1 if node_idx != t_idx and is_node_abnormal: abnormal_other_nodes_count += 1 # 1. 获取当前参考值 raw_val = real_time_values.get(node_name, np.nan) # 2. 实时回溯计算该节点的动态基线与上下限 dyn_lower, dyn_upper, dyn_med = np.nan, np.nan, np.nan if node_name in df_resampled.columns: vals = df_resampled[node_name].astype(np.float32) # 模拟滑动窗口还原 MAD 的历史状态 r_med = vals.rolling(window=config.MAD_HISTORY_WINDOW, min_periods=1).median() r_mad = (vals - r_med).abs().rolling(window=config.MAD_HISTORY_WINDOW, min_periods=1).median() # 提取时刻末尾的值作为基线 last_med = r_med.iloc[-1] last_mad = r_mad.iloc[-1] dyn_lower = last_med - config.MAD_THRESHOLD * last_mad dyn_upper = last_med + config.MAD_THRESHOLD * last_mad dyn_med = last_med # 3. 分别构建【物理范围】与【动态范围】的显示文字 phy_str = "未定义" dyn_str = "未定义" node_name_zh = "未知" # 提取中文名称 if node_name in self.processor.threshold_dict: info = self.processor.threshold_dict[node_name] node_name_zh = info.get('Name', '未知') if not pd.isna(raw_val) and node_name in self.processor.threshold_dict: info = self.processor.threshold_dict[node_name] g_min, g_max = info['Good_min'], info['Good_max'] # -- 物理范围判定 -- if raw_val > g_max and g_max != np.inf: pct = (raw_val - g_max) / (abs(g_max) + 1e-5) * 100 phy_str = f"偏高 {pct:.1f}% (物理允许上限: {g_max})" elif raw_val < g_min and g_min != -np.inf: pct = (g_min - raw_val) / (abs(g_min) + 1e-5) * 100 phy_str = f"偏低 {pct:.1f}% (物理允许下限: {g_min})" else: phy_str = f"正常 (物理范围: [{g_min}, {g_max}])" # -- 动态范围判定 -- if not pd.isna(dyn_lower) and not pd.isna(dyn_upper): if raw_val > dyn_upper: dyn_str = f"异常突增 (近期基线: {dyn_med:.2f}, 动态上限: {dyn_upper:.2f})" elif raw_val < dyn_lower: dyn_str = f"异常突降 (近期基线: {dyn_med:.2f}, 动态下限: {dyn_lower:.2f})" else: dyn_str = f"平稳波动 (近期基线: {dyn_med:.2f}, 动态区间: [{dyn_lower:.2f}, {dyn_upper:.2f}])" # 组装为清晰的结构化文本 dev_str = f"当前值: {raw_val:.2f} | 物理工况: {phy_str} | 动态趋势: {dyn_str}" path_details.append({ "node": node_name, "name": node_name_zh, "anomaly_score": round(float(node_score), 4), "is_abnormal": bool(is_node_abnormal), "deviation": dev_str }) # 除诱发变量外,只要有 >= 1 个节点异常即触发溯源报警 if abnormal_other_nodes_count >= 1: results.append({ "trigger": t_name, "path": " -> ".join(path_names), "root_cause": path_names[-1], "details": path_details }) # 若所有触发链路均不满足 >= 1 个额外异常节点的条件 if not results: return { "status": "normal", "message": "系统运行正常", } return { "status": "abnormal", "results": results } if __name__ == "__main__": # 模拟外部调用机制:每次固定传 2 小时的数据 test_file = os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}1.csv") if os.path.exists(test_file): print(">>> 正在启动在线诊断引擎测试 (模式:读2小时,查末尾40分钟)...") diagnoser = WaterPlantDiagnoser() # 假设原始数据采样率为 4 秒一次 # 2小时 = 7200秒 = 1800 行原始数据 CHUNK_SIZE = 1800 df_full = pd.read_csv(test_file, low_memory=False) if len(df_full) >= CHUNK_SIZE * 3: import json # 模拟 1:传入 0~2 小时的数据 df_sim1 = df_full.iloc[0 : CHUNK_SIZE] print("\n[测试 1] 传入 0~2 小时数据:") print(json.dumps(diagnoser.api_predict(df_sim1), ensure_ascii=False, indent=2)) # 模拟 2:传入 2~4 小时的数据 df_sim2 = df_full.iloc[CHUNK_SIZE : CHUNK_SIZE * 2] print("\n[测试 2] 传入 2~4 小时数据:") print(json.dumps(diagnoser.api_predict(df_sim2), ensure_ascii=False, indent=2)) # 模拟 3:传入末尾 2 小时的数据 df_sim3 = df_full.iloc[-CHUNK_SIZE:] print("\n[测试 3] 传入 4~6 小时数据:") print(json.dumps(diagnoser.api_predict(df_sim3), ensure_ascii=False, indent=2)) else: import json print("\n数据量不足以做三次两小时模拟,直接进行单次全量模拟:") print(json.dumps(diagnoser.api_predict(df_full), ensure_ascii=False, indent=2)) else: print(f"测试文件 {test_file} 不存在,无法执行本地测试。")