run_diagnosis.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. # -*- coding: utf-8 -*-
  2. """
  3. run_diagnosis.py: PLC数据实时诊断脚本
  4. 功能描述:
  5. 1. 从API逐个查询最近40分钟的传感器历史数据(按秒采样)
  6. 2. 降采样到4秒间隔,合并成DataFrame
  7. 3. 调用WaterPlantDiagnoser进行诊断
  8. 4. 将诊断结果追加到output_format.txt
  9. 使用方式:
  10. python run_diagnosis.py # 从API查询数据
  11. python run_diagnosis.py --local # 使用本地CSV测试
  12. 数据规格:
  13. - 查询时长: 40分钟 = 2400秒
  14. - 采样间隔: 每4秒一个点
  15. - 时间点数: 2400秒 ÷ 4秒 = 600 行
  16. - PLC点位: 158 列
  17. 依赖:
  18. - test.py中的WaterPlantDiagnoser类
  19. - 模型文件 models/ppo_tracing_model.pth
  20. - 阈值表 sensor_threshold.xlsx
  21. """
  22. import os
  23. import json
  24. import requests
  25. import pandas as pd
  26. import numpy as np
  27. from datetime import datetime, timedelta
  28. import logging
  29. from config import config
  30. # 日志配置
  31. logging.basicConfig(
  32. level=logging.INFO,
  33. format='%(asctime)s [%(levelname)s] %(message)s',
  34. datefmt='%Y-%m-%d %H:%M:%S'
  35. )
  36. logger = logging.getLogger(__name__)
  37. BASE_DIR = os.path.dirname(os.path.abspath(__file__))
  38. # ===================== API配置 =====================
  39. # 历史数据查询API,与loop_main.py中的API保持一致
  40. API_BASE_URL = "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data"
  41. # 请求头配置(与loop_main.py保持一致)
  42. HEADERS = {
  43. "Content-Type": "application/json",
  44. "JWT-TOKEN": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M"
  45. }
  46. # 项目ID
  47. PROJECT_ID = "92"
  48. # 设备ID(默认为1,根据实际情况调整)
  49. DEVICE_ID = "1"
  50. # 采样间隔配置
  51. SAMPLE_INTERVAL_SECONDS = 4 # 采样间隔:每4秒一个点
  52. DURATION_MINUTES = 40 # 查询时长:40分钟
  53. EXPECTED_ROWS = DURATION_MINUTES * 60 // SAMPLE_INTERVAL_SECONDS # 时间点数:2400秒 / 4秒 = 600行
  54. def load_plc_points(input_file: str) -> list:
  55. """
  56. 从input_format.txt加载PLC点位列表
  57. Args:
  58. input_file: 输入文件路径
  59. Returns:
  60. 点位名称列表(去掉第一行的'index')
  61. """
  62. points = []
  63. with open(input_file, 'r', encoding='utf-8-sig') as f: # utf-8-sig自动去除BOM
  64. for line in f:
  65. # 去除空白字符和换行符
  66. point = line.strip()
  67. # 跳过空行和index行(不区分大小写)
  68. if point and point.lower() != 'index':
  69. points.append(point)
  70. logger.info(f"加载了 {len(points)} 个PLC点位")
  71. return points
  72. def query_single_point_history(item_name: str, start_time: datetime, end_time: datetime) -> pd.Series:
  73. """
  74. 通过API查询单个点位的历史数据(按秒采样)
  75. Args:
  76. item_name: 数据项名称(PLC点位)
  77. start_time: 开始时间
  78. end_time: 结束时间
  79. Returns:
  80. pd.Series: 以时间为索引的数据序列
  81. """
  82. # 转换时间为毫秒级时间戳
  83. start_timestamp = int(start_time.timestamp() * 1000)
  84. end_timestamp = int(end_time.timestamp() * 1000)
  85. # API参数:按秒查询
  86. params = {
  87. "deviceid": DEVICE_ID,
  88. "dataitemid": item_name,
  89. "project_id": PROJECT_ID,
  90. "stime": start_timestamp,
  91. "etime": end_timestamp,
  92. "size": "1",
  93. "interval": "s", # 按分钟采样(与loop_main.py一致)
  94. "aggregator": "new" # 取最新值
  95. }
  96. try:
  97. response = requests.get(API_BASE_URL, params=params, headers=HEADERS, timeout=60)
  98. response.raise_for_status()
  99. data = response.json()
  100. # 检查API响应状态
  101. if data.get("code") != 200:
  102. # 第一个点位时打印详细错误,帮助排查
  103. if item_name == "C.M.UF1_PT_JS@out":
  104. logger.error(f"API返回错误: code={data.get('code')}, msg={data.get('msg', 'N/A')}")
  105. return pd.Series(dtype=float)
  106. if data.get("code") == 200 and data.get("data"):
  107. # 解析数据
  108. time_list = []
  109. val_list = []
  110. for item in data["data"]:
  111. if item.get("val") is not None and item.get("htime_at") is not None:
  112. try:
  113. time_str = item.get("htime_at")
  114. val = float(item.get("val"))
  115. time_list.append(time_str)
  116. val_list.append(val)
  117. except (ValueError, TypeError):
  118. pass
  119. if time_list:
  120. # 创建Series
  121. series = pd.Series(val_list, index=pd.to_datetime(time_list, format='mixed', errors='coerce'))
  122. series = series[~series.index.isna()] # 去除无效时间
  123. series = series.sort_index()
  124. return series
  125. return pd.Series(dtype=float)
  126. except requests.exceptions.RequestException as e:
  127. logger.warning(f"查询点位 {item_name} 网络失败: {e}")
  128. # 第一个点位时打印网络错误详情
  129. if item_name == "C.M.UF1_PT_JS@out":
  130. logger.error(f"API网络请求失败: {e}")
  131. return pd.Series(dtype=float)
  132. except (json.JSONDecodeError, ValueError, KeyError) as e:
  133. logger.warning(f"解析点位 {item_name} 响应失败: {e}")
  134. return pd.Series(dtype=float)
  135. def resample_to_4s(series: pd.Series, start_time: datetime, end_time: datetime) -> pd.Series:
  136. """
  137. 将原始数据降采样到4秒间隔
  138. Args:
  139. series: 原始时间序列数据
  140. start_time: 开始时间
  141. end_time: 结束时间
  142. Returns:
  143. 降采样后的Series
  144. """
  145. # 创建4秒间隔的时间索引
  146. time_index = pd.date_range(start=start_time, end=end_time, freq=f'{SAMPLE_INTERVAL_SECONDS}s')
  147. if series.empty:
  148. # 返回空序列
  149. return pd.Series(np.nan, index=time_index)
  150. # 重采样:使用均值聚合
  151. resampled = series.resample(f'{SAMPLE_INTERVAL_SECONDS}s').mean()
  152. # 重新索引到标准时间点
  153. resampled = resampled.reindex(time_index, method='nearest', tolerance=pd.Timedelta(seconds=SAMPLE_INTERVAL_SECONDS))
  154. # 前向填充缺失值
  155. resampled = resampled.ffill().bfill()
  156. return resampled
  157. def query_all_points_history(plc_points: list, duration_minutes: int = DURATION_MINUTES) -> pd.DataFrame:
  158. """
  159. 逐个查询所有PLC点位的历史数据,降采样后合并为DataFrame
  160. 异常处理:
  161. - 查询失败的点位填充NaN,后续通过ffill/bfill处理
  162. Args:
  163. plc_points: PLC点位列表
  164. duration_minutes: 查询时长(分钟),默认40分钟
  165. Returns:
  166. DataFrame,第一列为时间索引,后续为PLC点位列
  167. """
  168. # 多查1分钟确保时间跨度 >= 40分钟
  169. query_duration = duration_minutes + 1 # 41分钟
  170. end_time = datetime.now()
  171. start_time = end_time - timedelta(minutes=query_duration)
  172. logger.info(f"查询时间范围: {start_time.strftime('%Y-%m-%d %H:%M:%S')} 至 {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
  173. logger.info(f"查询时长: {query_duration}分钟 (确保>=40分钟)")
  174. logger.info(f"开始逐个查询 {len(plc_points)} 个点位...")
  175. # 创建标准时间索引(基于查询时长)
  176. time_index = pd.date_range(start=start_time, end=end_time, freq=f'{SAMPLE_INTERVAL_SECONDS}s')
  177. num_rows = len(time_index)
  178. # 使用字典收集所有列数据,避免DataFrame碎片化
  179. data_dict = {'index': time_index}
  180. success_count = 0
  181. fail_count = 0
  182. for i, point in enumerate(plc_points):
  183. # 进度显示
  184. if (i + 1) % 10 == 0 or i == 0:
  185. logger.info(f"查询进度: {i + 1}/{len(plc_points)} ({(i+1)/len(plc_points)*100:.1f}%)")
  186. # 查询单个点位
  187. raw_series = query_single_point_history(point, start_time, end_time)
  188. if not raw_series.empty:
  189. # 查询成功:降采样到4秒间隔
  190. resampled = resample_to_4s(raw_series, start_time, end_time)
  191. # 对齐到标准时间索引
  192. if len(resampled) >= num_rows:
  193. point_data = resampled.values[:num_rows]
  194. else:
  195. point_data = np.full(num_rows, np.nan)
  196. point_data[:len(resampled)] = resampled.values
  197. data_dict[point] = point_data
  198. success_count += 1
  199. else:
  200. # 查询失败:直接填充0
  201. data_dict[point] = np.zeros(num_rows)
  202. fail_count += 1
  203. logger.warning(f"点位 {point} 查询失败,填充0")
  204. logger.info(f"查询完成: 成功 {success_count}/{len(plc_points)}, 失败 {fail_count} 个")
  205. # 一次性创建DataFrame
  206. result_df = pd.DataFrame(data_dict)
  207. logger.info(f"最终DataFrame大小: {result_df.shape}")
  208. return result_df
  209. def run_diagnosis(df: pd.DataFrame) -> dict:
  210. """
  211. 调用WaterPlantDiagnoser进行诊断
  212. Args:
  213. df: 包含时间列的传感器数据DataFrame(600行×159列,含时间列)
  214. Returns:
  215. 诊断结果字典
  216. """
  217. # 延迟导入,避免循环依赖
  218. from test import WaterPlantDiagnoser
  219. try:
  220. logger.info("初始化诊断器...")
  221. diagnoser = WaterPlantDiagnoser()
  222. logger.info(f"开始诊断,输入数据: {df.shape}")
  223. result = diagnoser.api_predict(df)
  224. return result
  225. except FileNotFoundError as e:
  226. logger.error(f"模型文件缺失: {e}")
  227. return {"status": "error", "message": str(e)}
  228. except Exception as e:
  229. logger.error(f"诊断过程出错: {e}")
  230. import traceback
  231. traceback.print_exc()
  232. return {"status": "error", "message": str(e)}
  233. def append_result_to_file(result: dict, output_file: str):
  234. """
  235. 将诊断结果追加写入到输出文件
  236. Args:
  237. result: 诊断结果字典
  238. output_file: 输出文件路径
  239. """
  240. # 添加时间戳
  241. result_with_time = {
  242. "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
  243. **result
  244. }
  245. # 将结果转换为JSON字符串
  246. result_str = json.dumps(result_with_time, ensure_ascii=False)
  247. # 追加写入文件
  248. with open(output_file, 'a', encoding='utf-8') as f:
  249. f.write(result_str + '\n')
  250. logger.info(f"结果已追加到: {output_file}")
  251. def main():
  252. """
  253. 主函数:执行完整的诊断流程
  254. """
  255. logger.info("=" * 60)
  256. logger.info(" 水厂异常诊断系统 - 实时诊断")
  257. logger.info("=" * 60)
  258. # 文件路径
  259. input_file = os.path.join(BASE_DIR, "input_format.txt")
  260. output_file = os.path.join(BASE_DIR, "output_format.txt")
  261. # 数据保存目录
  262. data_save_dir = os.path.join(BASE_DIR, "data_history")
  263. os.makedirs(data_save_dir, exist_ok=True)
  264. # 步骤1: 加载PLC点位列表
  265. if not os.path.exists(input_file):
  266. logger.error(f"点位配置文件不存在: {input_file}")
  267. return
  268. plc_points = load_plc_points(input_file)
  269. if not plc_points:
  270. logger.error("未能加载任何PLC点位")
  271. return
  272. # 步骤2: 逐个查询40分钟历史数据,降采样到4秒间隔
  273. df = query_all_points_history(plc_points, duration_minutes=DURATION_MINUTES)
  274. if df.empty or len(df) < 100:
  275. result = {"status": "error", "message": "无法获取足够的历史数据"}
  276. append_result_to_file(result, output_file)
  277. return
  278. # 步骤3: 保存原始数据到文件(带时间戳,保留2位小数)
  279. timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S')
  280. data_filename = f"plc_data_{timestamp_str}.csv"
  281. data_filepath = os.path.join(data_save_dir, data_filename)
  282. # 数值列保留2位小数
  283. df_to_save = df.copy()
  284. for col in df_to_save.columns:
  285. if col != 'index' and df_to_save[col].dtype in ['float64', 'float32']:
  286. df_to_save[col] = df_to_save[col].round(2)
  287. df_to_save.to_csv(data_filepath, index=False, encoding='utf-8')
  288. logger.info(f"原始数据已保存到: {data_filepath}")
  289. # 步骤4: 执行诊断
  290. result = run_diagnosis(df)
  291. logger.info(f"诊断结果: {result}")
  292. # 步骤5: 追加结果到输出文件
  293. append_result_to_file(result, output_file)
  294. logger.info("诊断完成")
  295. def run_loop(interval_minutes: int = 40):
  296. """
  297. 循环运行模式:每隔指定分钟自动执行一次诊断
  298. Args:
  299. interval_minutes: 运行间隔(分钟),默认40分钟
  300. """
  301. import time
  302. logger.info("=" * 60)
  303. logger.info(f" 水厂异常诊断系统 - 循环运行模式")
  304. logger.info(f" 运行间隔: 每 {interval_minutes} 分钟执行一次")
  305. logger.info("=" * 60)
  306. run_count = 0
  307. while True:
  308. try:
  309. run_count += 1
  310. logger.info(f"\n{'='*40}")
  311. logger.info(f"开始第 {run_count} 轮诊断...")
  312. logger.info(f"{'='*40}")
  313. # 执行一次诊断
  314. main()
  315. # 计算下一次运行时间
  316. next_run_time = datetime.now() + timedelta(minutes=interval_minutes)
  317. logger.info(f"本轮完成,下一轮将在 {next_run_time.strftime('%Y-%m-%d %H:%M:%S')} 开始")
  318. logger.info(f"等待 {interval_minutes} 分钟...")
  319. # 等待指定时间
  320. time.sleep(interval_minutes * 60)
  321. except KeyboardInterrupt:
  322. logger.info("\n收到中断信号,停止循环运行")
  323. break
  324. except Exception as e:
  325. logger.error(f"诊断过程出错: {e}")
  326. import traceback
  327. traceback.print_exc()
  328. # 出错后继续等待下一轮
  329. logger.info(f"等待 {interval_minutes} 分钟后重试...")
  330. time.sleep(interval_minutes * 60)
  331. # ===================== 本地模拟测试函数 =====================
  332. def test_with_local_data():
  333. """
  334. 使用本地CSV文件进行测试(不依赖API)
  335. 用于在没有API连接时测试诊断逻辑
  336. """
  337. logger.info("=" * 60)
  338. logger.info(" 水厂异常诊断系统 - 本地数据测试")
  339. logger.info("=" * 60)
  340. output_file = os.path.join(BASE_DIR, "output_format.txt")
  341. # 尝试读取本地测试数据
  342. test_file = os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}1.csv")
  343. if not os.path.exists(test_file):
  344. logger.error(f"测试数据文件不存在: {test_file}")
  345. result = {"status": "error", "message": f"测试文件不存在: {test_file}"}
  346. append_result_to_file(result, output_file)
  347. return
  348. # 读取数据(40分钟 / 4秒采样间隔 = 600条,多读一些确保足够)
  349. logger.info(f"读取测试数据: {test_file}")
  350. df = pd.read_csv(test_file)
  351. # 取前900条数据确保超过40分钟
  352. if len(df) > 900:
  353. df = df.iloc[:900]
  354. logger.info(f"测试数据大小: {df.shape} (预期至少600行)")
  355. # 执行诊断
  356. result = run_diagnosis(df)
  357. logger.info(f"诊断结果: {result}")
  358. # 追加结果到输出文件
  359. append_result_to_file(result, output_file)
  360. logger.info("本地测试完成")
  361. if __name__ == "__main__":
  362. import sys
  363. if len(sys.argv) > 1:
  364. if sys.argv[1] == "--local":
  365. # 使用本地数据测试
  366. test_with_local_data()
  367. elif sys.argv[1] == "--once":
  368. # 单次运行
  369. main()
  370. elif sys.argv[1] == "--help" or sys.argv[1] == "-h":
  371. print("用法:")
  372. print(" python run_diagnosis.py # 默认循环运行(每40分钟一次)")
  373. print(" python run_diagnosis.py 30 # 循环运行(每30分钟一次)")
  374. print(" python run_diagnosis.py --once # 单次运行")
  375. print(" python run_diagnosis.py --local # 本地数据测试")
  376. else:
  377. # 尝试解析为间隔分钟数
  378. try:
  379. interval = int(sys.argv[1])
  380. run_loop(interval_minutes=interval)
  381. except ValueError:
  382. print(f"未知参数: {sys.argv[1]}")
  383. print("使用 --help 查看用法")
  384. else:
  385. # 默认:循环运行模式(40分钟间隔)
  386. run_loop(interval_minutes=40)