| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- # -*- coding: utf-8 -*-
- """test.py: 在线诊断接口"""
- import os
- import pandas as pd
- import numpy as np
- import torch
- from config import config
- from data_processing import DataAnomalyProcessor
- from causal_structure import CausalStructureBuilder
- from rl_tracing import RLTrainer, CausalTracingEnv
- class WaterPlantDiagnoser:
- def __init__(self):
-
- # 1. 初始化数据处理器 (用于加载阈值和计算得分,异常表征逻辑与训练完全一致)
- self.processor = DataAnomalyProcessor()
- self.sensor_list = self.processor.sensor_list
- self.threshold_df = self.processor.threshold_df
-
- # 2. 构建因果图 (Layer 2)
- self.builder = CausalStructureBuilder(self.threshold_df)
- self.causal_graph = self.builder.build()
-
- # 3. 加载强化学习模型 (Layer 3)
- dummy_scores = np.zeros((1, len(self.sensor_list)), dtype=np.float32)
- self.trainer = RLTrainer(self.causal_graph, dummy_scores, self.threshold_df)
-
- model_path = os.path.join(config.MODEL_SAVE_DIR, "ppo_tracing_model.pth")
- if not os.path.exists(model_path):
- print(f"[Warning] 未找到模型文件: {model_path}。如果是测试环境,请确保已有预训练模型。")
- else:
- state_dict = torch.load(model_path, map_location=torch.device('cpu'), weights_only=True)
- self.trainer.model.load_state_dict(state_dict)
-
- self.trainer.model.eval()
-
- def _preprocess_dataframe(self, df_input):
- """
- 严格复现数据处理逻辑
- """
- try:
- df = df_input.copy()
- # 1. 时间解析
- time_col = df.columns[0]
- df[time_col] = pd.to_datetime(df[time_col], format='mixed', errors='coerce')
- df = df.dropna(subset=[time_col]).set_index(time_col)
-
- # 2. 筛选列
- valid_cols = [c for c in df.columns if c in self.sensor_list]
- if not valid_cols: return None
- df_valid = df[valid_cols]
-
- # 3. 降采样 (4s -> 20s)
- df_resampled = df_valid.resample(f"{config.TARGET_SAMPLE_INTERVAL}s").mean()
- df_resampled = df_resampled.astype(np.float32)
-
- return df_resampled
- except Exception as e:
- print(f"数据预处理错误: {e}")
- return None
- def api_predict(self, df_raw):
- """
- 对外接口: 每次输入过去2小时的数据,取最新的40分钟进行诊断
- """
- # --- Layer 1: 数据预处理 ---
- df_resampled = self._preprocess_dataframe(df_raw)
- if df_resampled is None or df_resampled.empty:
- return {"status": "error", "msg": "数据预处理失败或数据为空"}
-
- # --- Layer 1: 异常得分计算 (包含绝对+MAD综合得分) ---
- scores_dict = {}
- for sensor in self.sensor_list:
- if sensor in df_resampled.columns:
- scores_dict[sensor] = self.processor._calculate_point_score_vectorized(
- df_resampled[sensor], sensor
- )
- else:
- # 严格对齐: 使用 np.nan 保持与训练完全一致
- scores_dict[sensor] = pd.Series(np.nan, index=df_resampled.index, dtype=np.float32)
-
- point_score_df = pd.DataFrame(scores_dict)[self.sensor_list]
-
- # --- Layer 1: 提取最新40分钟进行窗口聚合与原始数据均值计算 ---
- req_points = config.POINTS_PER_WINDOW # 40分钟 = 120个点 (20s间隔)
- total_points = len(point_score_df)
-
- if total_points < req_points:
- return {
- "status": "warning",
- "message": f"传入数据不足40分钟 (当前: {total_points*20}s, 需要: {req_points*20}s)"
- }
-
- # 切片:取最后40分钟的数据 (用于计算异常分) 和 原始数据 (用于计算偏差百分比)
- latest_40min_scores = point_score_df.iloc[-req_points:].values
- latest_40min_raw = df_resampled.iloc[-req_points:]
- # 计算这40分钟内原始传感器的均值,作为业务展示的参考值
- real_time_values = latest_40min_raw.mean(skipna=True)
-
- # 计算有效比例和 95 分位数
- valid_counts = np.sum(~np.isnan(latest_40min_scores), axis=0)
- valid_ratios = valid_counts / req_points
-
- current_window_scores = np.zeros(latest_40min_scores.shape[1], dtype=np.float32)
- valid_mask = valid_ratios >= config.VALID_DATA_RATIO
-
- if np.any(valid_mask):
- current_window_scores[valid_mask] = np.nanquantile(latest_40min_scores[:, valid_mask], 0.95, axis=0)
-
- current_window_scores = np.clip(current_window_scores, 0, 1)
-
- # --- Layer 2 & 3: 触发检测与溯源 ---
- results = []
- active_triggers = []
-
- for t_name in config.TRIGGER_SENSORS:
- if t_name in self.causal_graph['sensor_to_idx']:
- idx = self.causal_graph['sensor_to_idx'][t_name]
- score = current_window_scores[idx]
- if score > config.TRIGGER_SCORE_THRESH:
- active_triggers.append((t_name, idx, score))
- if not active_triggers:
- return {
- "status": "normal",
- "message": "系统运行正常",
- }
- # 构建临时环境进行溯源
- env_scores = current_window_scores.reshape(1, -1)
- temp_env = CausalTracingEnv(self.causal_graph, env_scores, self.threshold_df, self.trainer.expert_knowledge)
-
- for t_name, t_idx, t_score in active_triggers:
- state_data = temp_env.reset(force_window_idx=0, force_trigger=t_name)
- path_idxs = [t_idx]
- done = False
-
- while not done:
- s_int = state_data[0].unsqueeze(0)
- s_float = state_data[1].unsqueeze(0)
- valid = temp_env.get_valid_actions(path_idxs[-1])
-
- if len(valid) == 0: break
-
- logits, _ = self.trainer.model(s_int, s_float)
- mask = torch.full_like(logits, -1e9)
- mask[0, valid] = 0
- act = torch.argmax(logits + mask, dim=1).item()
-
- state_data, _, done, _ = temp_env.step(act)
- path_idxs.append(act)
- if len(path_idxs) >= config.MAX_PATH_LENGTH: done = True
-
- path_names = [self.trainer.idx_to_sensor[i] for i in path_idxs]
-
- # --- 1 & 2:计算链路有效性与双重偏差量化 ---
- path_details = []
- abnormal_other_nodes_count = 0
-
- for node_idx, node_name in zip(path_idxs, path_names):
- node_score = current_window_scores[node_idx]
- is_node_abnormal = node_score > config.WINDOW_ANOMALY_THRESHOLD
-
- # 若不是诱发变量本身且达到异常标准,计数+1
- if node_idx != t_idx and is_node_abnormal:
- abnormal_other_nodes_count += 1
-
- # 1. 获取当前参考值
- raw_val = real_time_values.get(node_name, np.nan)
-
- # 2. 实时回溯计算该节点的动态基线与上下限
- dyn_lower, dyn_upper, dyn_med = np.nan, np.nan, np.nan
- if node_name in df_resampled.columns:
- vals = df_resampled[node_name].astype(np.float32)
- # 模拟滑动窗口还原 MAD 的历史状态
- r_med = vals.rolling(window=config.MAD_HISTORY_WINDOW, min_periods=1).median()
- r_mad = (vals - r_med).abs().rolling(window=config.MAD_HISTORY_WINDOW, min_periods=1).median()
-
- # 提取时刻末尾的值作为基线
- last_med = r_med.iloc[-1]
- last_mad = r_mad.iloc[-1]
- dyn_lower = last_med - config.MAD_THRESHOLD * last_mad
- dyn_upper = last_med + config.MAD_THRESHOLD * last_mad
- dyn_med = last_med
- # 3. 分别构建【物理范围】与【动态范围】的显示文字
- phy_str = "未定义"
- dyn_str = "未定义"
- node_name_zh = "未知"
-
- # 提取中文名称
- if node_name in self.processor.threshold_dict:
- info = self.processor.threshold_dict[node_name]
- node_name_zh = info.get('Name', '未知')
-
- if not pd.isna(raw_val) and node_name in self.processor.threshold_dict:
- info = self.processor.threshold_dict[node_name]
- g_min, g_max = info['Good_min'], info['Good_max']
-
- # -- 物理范围判定 --
- if raw_val > g_max and g_max != np.inf:
- pct = (raw_val - g_max) / (abs(g_max) + 1e-5) * 100
- phy_str = f"偏高 {pct:.1f}% (物理允许上限: {g_max})"
- elif raw_val < g_min and g_min != -np.inf:
- pct = (g_min - raw_val) / (abs(g_min) + 1e-5) * 100
- phy_str = f"偏低 {pct:.1f}% (物理允许下限: {g_min})"
- else:
- phy_str = f"正常 (物理范围: [{g_min}, {g_max}])"
-
- # -- 动态范围判定 --
- if not pd.isna(dyn_lower) and not pd.isna(dyn_upper):
- if raw_val > dyn_upper:
- dyn_str = f"异常突增 (近期基线: {dyn_med:.2f}, 动态上限: {dyn_upper:.2f})"
- elif raw_val < dyn_lower:
- dyn_str = f"异常突降 (近期基线: {dyn_med:.2f}, 动态下限: {dyn_lower:.2f})"
- else:
- dyn_str = f"平稳波动 (近期基线: {dyn_med:.2f}, 动态区间: [{dyn_lower:.2f}, {dyn_upper:.2f}])"
- # 组装为清晰的结构化文本
- dev_str = f"当前值: {raw_val:.2f} | 物理工况: {phy_str} | 动态趋势: {dyn_str}"
- path_details.append({
- "node": node_name,
- "name": node_name_zh,
- "anomaly_score": round(float(node_score), 4),
- "is_abnormal": bool(is_node_abnormal),
- "deviation": dev_str
- })
-
- # 除诱发变量外,只要有 >= 1 个节点异常即触发溯源报警
- if abnormal_other_nodes_count >= 1:
- results.append({
- "trigger": t_name,
- "path": " -> ".join(path_names),
- "root_cause": path_names[-1],
- "details": path_details
- })
-
- # 若所有触发链路均不满足 >= 1 个额外异常节点的条件
- if not results:
- return {
- "status": "normal",
- "message": "系统运行正常",
- }
-
- return {
- "status": "abnormal",
- "results": results
- }
- if __name__ == "__main__":
-
- # 模拟外部调用机制:每次固定传 2 小时的数据
- test_file = os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}1.csv")
-
- if os.path.exists(test_file):
- print(">>> 正在启动在线诊断引擎测试 (模式:读2小时,查末尾40分钟)...")
- diagnoser = WaterPlantDiagnoser()
-
- # 假设原始数据采样率为 4 秒一次
- # 2小时 = 7200秒 = 1800 行原始数据
- CHUNK_SIZE = 1800
-
- df_full = pd.read_csv(test_file, low_memory=False)
-
- if len(df_full) >= CHUNK_SIZE * 3:
- import json
-
- # 模拟 1:传入 0~2 小时的数据
- df_sim1 = df_full.iloc[0 : CHUNK_SIZE]
- print("\n[测试 1] 传入 0~2 小时数据:")
- print(json.dumps(diagnoser.api_predict(df_sim1), ensure_ascii=False, indent=2))
-
- # 模拟 2:传入 2~4 小时的数据
- df_sim2 = df_full.iloc[CHUNK_SIZE : CHUNK_SIZE * 2]
- print("\n[测试 2] 传入 2~4 小时数据:")
- print(json.dumps(diagnoser.api_predict(df_sim2), ensure_ascii=False, indent=2))
-
- # 模拟 3:传入末尾 2 小时的数据
- df_sim3 = df_full.iloc[-CHUNK_SIZE:]
- print("\n[测试 3] 传入 4~6 小时数据:")
- print(json.dumps(diagnoser.api_predict(df_sim3), ensure_ascii=False, indent=2))
-
- else:
- import json
- print("\n数据量不足以做三次两小时模拟,直接进行单次全量模拟:")
- print(json.dumps(diagnoser.api_predict(df_full), ensure_ascii=False, indent=2))
-
- else:
- print(f"测试文件 {test_file} 不存在,无法执行本地测试。")
|