|
@@ -59,9 +59,12 @@ class WaterPlantDiagnoser:
|
|
|
print(f"数据预处理错误: {e}")
|
|
print(f"数据预处理错误: {e}")
|
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
- def api_predict(self, df_raw):
|
|
|
|
|
|
|
+ def api_predict(self, df_raw, mode='auto', manual_trigger=None):
|
|
|
"""
|
|
"""
|
|
|
对外接口: 每次输入过去2小时的数据,取最新的40分钟进行诊断
|
|
对外接口: 每次输入过去2小时的数据,取最新的40分钟进行诊断
|
|
|
|
|
+ :param df_raw: 原始DataFrame数据
|
|
|
|
|
+ :param mode: 'auto'为自动触发模式, 'manual'为手动指定触发模式
|
|
|
|
|
+ :param manual_trigger: 当mode='manual'时生效,指定诱发变量的ID
|
|
|
"""
|
|
"""
|
|
|
# --- Layer 1: 数据预处理 ---
|
|
# --- Layer 1: 数据预处理 ---
|
|
|
df_resampled = self._preprocess_dataframe(df_raw)
|
|
df_resampled = self._preprocess_dataframe(df_raw)
|
|
@@ -113,12 +116,26 @@ class WaterPlantDiagnoser:
|
|
|
results = []
|
|
results = []
|
|
|
active_triggers = []
|
|
active_triggers = []
|
|
|
|
|
|
|
|
- for t_name in config.TRIGGER_SENSORS:
|
|
|
|
|
- if t_name in self.causal_graph['sensor_to_idx']:
|
|
|
|
|
- idx = self.causal_graph['sensor_to_idx'][t_name]
|
|
|
|
|
- score = current_window_scores[idx]
|
|
|
|
|
- if score > config.TRIGGER_SCORE_THRESH:
|
|
|
|
|
- active_triggers.append((t_name, idx, score))
|
|
|
|
|
|
|
+ # 根据模式判定触发列表
|
|
|
|
|
+ if mode == 'auto':
|
|
|
|
|
+ for t_name in config.TRIGGER_SENSORS:
|
|
|
|
|
+ if t_name in self.causal_graph['sensor_to_idx']:
|
|
|
|
|
+ idx = self.causal_graph['sensor_to_idx'][t_name]
|
|
|
|
|
+ score = current_window_scores[idx]
|
|
|
|
|
+ if score > config.TRIGGER_SCORE_THRESH:
|
|
|
|
|
+ active_triggers.append((t_name, idx, score))
|
|
|
|
|
+ elif mode == 'manual':
|
|
|
|
|
+ if not manual_trigger:
|
|
|
|
|
+ return {"status": "error", "message": "手动模式下必须提供 manual_trigger 参数"}
|
|
|
|
|
+ if manual_trigger not in self.causal_graph['sensor_to_idx']:
|
|
|
|
|
+ return {"status": "error", "message": f"指定的诱发变量 {manual_trigger} 不在设备/传感器列表中"}
|
|
|
|
|
+
|
|
|
|
|
+ # 手动模式:强制加入触发列表,不检测得分阈值
|
|
|
|
|
+ idx = self.causal_graph['sensor_to_idx'][manual_trigger]
|
|
|
|
|
+ score = current_window_scores[idx]
|
|
|
|
|
+ active_triggers.append((manual_trigger, idx, score))
|
|
|
|
|
+ else:
|
|
|
|
|
+ return {"status": "error", "message": f"未知的诊断模式: {mode}"}
|
|
|
|
|
|
|
|
if not active_triggers:
|
|
if not active_triggers:
|
|
|
return {
|
|
return {
|
|
@@ -227,8 +244,10 @@ class WaterPlantDiagnoser:
|
|
|
"deviation": dev_str
|
|
"deviation": dev_str
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
- # 除诱发变量外,只要有 >= 1 个节点异常即触发溯源报警
|
|
|
|
|
- if abnormal_other_nodes_count >= 1:
|
|
|
|
|
|
|
+ # 判断是否将本链路加入返回结果
|
|
|
|
|
+ # 自动模式下要求:除诱发变量外,至少有 >= 1 个额外节点异常
|
|
|
|
|
+ # 手动模式下要求:无视拦截规则,强制返回链路用于排查
|
|
|
|
|
+ if mode == 'manual' or abnormal_other_nodes_count >= 1:
|
|
|
results.append({
|
|
results.append({
|
|
|
"trigger": t_name,
|
|
"trigger": t_name,
|
|
|
"path": " -> ".join(path_names),
|
|
"path": " -> ".join(path_names),
|
|
@@ -236,7 +255,7 @@ class WaterPlantDiagnoser:
|
|
|
"details": path_details
|
|
"details": path_details
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
- # 若所有触发链路均不满足 >= 1 个额外异常节点的条件
|
|
|
|
|
|
|
+ # 若所有触发链路均不满足 >= 1 个额外异常节点的条件 (仅在auto模式下发生)
|
|
|
if not results:
|
|
if not results:
|
|
|
return {
|
|
return {
|
|
|
"status": "normal",
|
|
"status": "normal",
|
|
@@ -244,7 +263,7 @@ class WaterPlantDiagnoser:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return {
|
|
return {
|
|
|
- "status": "abnormal",
|
|
|
|
|
|
|
+ "status": "manual_traced" if mode == 'manual' else "abnormal",
|
|
|
"results": results
|
|
"results": results
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -272,25 +291,28 @@ if __name__ == "__main__":
|
|
|
if len(df_full) >= CHUNK_SIZE * 3:
|
|
if len(df_full) >= CHUNK_SIZE * 3:
|
|
|
import json
|
|
import json
|
|
|
|
|
|
|
|
- # 模拟 1:传入 0~2 小时的数据
|
|
|
|
|
|
|
+ # 模拟 1:传入 0~2 小时的数据(自动模式测试)
|
|
|
df_sim1 = df_full.iloc[0 : CHUNK_SIZE]
|
|
df_sim1 = df_full.iloc[0 : CHUNK_SIZE]
|
|
|
- print("\n[测试 1] 传入 0~2 小时数据:")
|
|
|
|
|
- print(json.dumps(diagnoser.api_predict(df_sim1), ensure_ascii=False, indent=2))
|
|
|
|
|
|
|
+ print("\n[测试 1] 传入 0~2 小时数据 (自动模式):")
|
|
|
|
|
+ print(json.dumps(diagnoser.api_predict(df_sim1, mode='auto'), ensure_ascii=False, indent=2))
|
|
|
|
|
|
|
|
- # 模拟 2:传入 2~4 小时的数据
|
|
|
|
|
|
|
+ # 模拟 2:传入 2~4 小时的数据(手动模式测试)
|
|
|
df_sim2 = df_full.iloc[CHUNK_SIZE : CHUNK_SIZE * 2]
|
|
df_sim2 = df_full.iloc[CHUNK_SIZE : CHUNK_SIZE * 2]
|
|
|
- print("\n[测试 2] 传入 2~4 小时数据:")
|
|
|
|
|
- print(json.dumps(diagnoser.api_predict(df_sim2), ensure_ascii=False, indent=2))
|
|
|
|
|
|
|
+ print("\n[测试 2] 传入 2~4 小时数据 (手动模式,指定触发):")
|
|
|
|
|
+ # 从 config 自动抓取一个诱发变量,或者固定想测试的,如 'UF1Per'
|
|
|
|
|
+ test_trigger = config.TRIGGER_SENSORS[0] if len(config.TRIGGER_SENSORS) > 0 else "Unknown"
|
|
|
|
|
+ print(json.dumps(diagnoser.api_predict(df_sim2, mode='manual', manual_trigger=test_trigger), ensure_ascii=False, indent=2))
|
|
|
|
|
|
|
|
- # 模拟 3:传入末尾 2 小时的数据
|
|
|
|
|
|
|
+ # 模拟 3:传入末尾 2 小时的数据(自动模式测试)
|
|
|
df_sim3 = df_full.iloc[-CHUNK_SIZE:]
|
|
df_sim3 = df_full.iloc[-CHUNK_SIZE:]
|
|
|
- print("\n[测试 3] 传入 4~6 小时数据:")
|
|
|
|
|
- print(json.dumps(diagnoser.api_predict(df_sim3), ensure_ascii=False, indent=2))
|
|
|
|
|
|
|
+ print("\n[测试 3] 传入 4~6 小时数据 (自动模式):")
|
|
|
|
|
+ print(json.dumps(diagnoser.api_predict(df_sim3, mode='auto'), ensure_ascii=False, indent=2))
|
|
|
|
|
|
|
|
else:
|
|
else:
|
|
|
import json
|
|
import json
|
|
|
- print("\n数据量不足以做三次两小时模拟,直接进行单次全量模拟:")
|
|
|
|
|
- print(json.dumps(diagnoser.api_predict(df_full), ensure_ascii=False, indent=2))
|
|
|
|
|
|
|
+ print("\n数据量不足以做三次两小时模拟,直接进行单次全量手动模拟:")
|
|
|
|
|
+ test_trigger = config.TRIGGER_SENSORS[0] if len(config.TRIGGER_SENSORS) > 0 else "Unknown"
|
|
|
|
|
+ print(json.dumps(diagnoser.api_predict(df_full, mode='manual', manual_trigger=test_trigger), ensure_ascii=False, indent=2))
|
|
|
|
|
|
|
|
else:
|
|
else:
|
|
|
print(f"测试文件 {test_file} 不存在,无法执行本地测试。")
|
|
print(f"测试文件 {test_file} 不存在,无法执行本地测试。")
|