# -*- coding: utf-8 -*- """ run_diagnosis.py: PLC数据实时诊断脚本 功能描述: 1. 从API逐个查询最近40分钟的传感器历史数据(按秒采样) 2. 降采样到4秒间隔,合并成DataFrame 3. 调用WaterPlantDiagnoser进行诊断 4. 将诊断结果追加到output_format.txt 使用方式: python run_diagnosis.py # 从API查询数据 python run_diagnosis.py --local # 使用本地CSV测试 数据规格: - 查询时长: 40分钟 = 2400秒 - 采样间隔: 每4秒一个点 - 时间点数: 2400秒 ÷ 4秒 = 600 行 - PLC点位: 158 列 依赖: - test.py中的WaterPlantDiagnoser类 - 模型文件 models/ppo_tracing_model.pth - 阈值表 sensor_threshold.xlsx """ import os import json import requests import pandas as pd import numpy as np from datetime import datetime, timedelta import logging from config import config # 日志配置 logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # ===================== API配置 ===================== # 历史数据查询API,与loop_main.py中的API保持一致 API_BASE_URL = "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data" # 请求头配置(与loop_main.py保持一致) HEADERS = { "Content-Type": "application/json", "JWT-TOKEN": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M" } # 项目ID PROJECT_ID = "92" # 设备ID(默认为1,根据实际情况调整) DEVICE_ID = "1" # 采样间隔配置 SAMPLE_INTERVAL_SECONDS = 4 # 采样间隔:每4秒一个点 DURATION_MINUTES = 40 # 查询时长:40分钟 EXPECTED_ROWS = DURATION_MINUTES * 60 // SAMPLE_INTERVAL_SECONDS # 时间点数:2400秒 / 4秒 = 600行 def load_plc_points(input_file: str) -> list: """ 从input_format.txt加载PLC点位列表 Args: input_file: 输入文件路径 Returns: 点位名称列表(去掉第一行的'index') """ points = [] with open(input_file, 'r', encoding='utf-8-sig') as f: # utf-8-sig自动去除BOM for line in f: # 去除空白字符和换行符 point = line.strip() # 跳过空行和index行(不区分大小写) if point and point.lower() != 'index': points.append(point) logger.info(f"加载了 {len(points)} 个PLC点位") return points def query_single_point_history(item_name: str, start_time: datetime, end_time: datetime) -> pd.Series: """ 通过API查询单个点位的历史数据(按秒采样) Args: item_name: 数据项名称(PLC点位) start_time: 开始时间 end_time: 结束时间 Returns: pd.Series: 以时间为索引的数据序列 """ # 转换时间为毫秒级时间戳 start_timestamp = int(start_time.timestamp() * 1000) end_timestamp = int(end_time.timestamp() * 1000) # API参数:按秒查询 params = { "deviceid": DEVICE_ID, "dataitemid": item_name, "project_id": PROJECT_ID, "stime": start_timestamp, "etime": end_timestamp, "size": "1", "interval": "s", # 按分钟采样(与loop_main.py一致) "aggregator": "new" # 取最新值 } try: response = requests.get(API_BASE_URL, params=params, headers=HEADERS, timeout=60) response.raise_for_status() data = response.json() # 检查API响应状态 if data.get("code") != 200: # 第一个点位时打印详细错误,帮助排查 if item_name == "C.M.UF1_PT_JS@out": logger.error(f"API返回错误: code={data.get('code')}, msg={data.get('msg', 'N/A')}") return pd.Series(dtype=float) if data.get("code") == 200 and data.get("data"): # 解析数据 time_list = [] val_list = [] for item in data["data"]: if item.get("val") is not None and item.get("htime_at") is not None: try: time_str = item.get("htime_at") val = float(item.get("val")) time_list.append(time_str) val_list.append(val) except (ValueError, TypeError): pass if time_list: # 创建Series series = pd.Series(val_list, index=pd.to_datetime(time_list, format='mixed', errors='coerce')) series = series[~series.index.isna()] # 去除无效时间 series = series.sort_index() return series return pd.Series(dtype=float) except requests.exceptions.RequestException as e: logger.warning(f"查询点位 {item_name} 网络失败: {e}") # 第一个点位时打印网络错误详情 if item_name == "C.M.UF1_PT_JS@out": logger.error(f"API网络请求失败: {e}") return pd.Series(dtype=float) except (json.JSONDecodeError, ValueError, KeyError) as e: logger.warning(f"解析点位 {item_name} 响应失败: {e}") return pd.Series(dtype=float) def resample_to_4s(series: pd.Series, start_time: datetime, end_time: datetime) -> pd.Series: """ 将原始数据降采样到4秒间隔 Args: series: 原始时间序列数据 start_time: 开始时间 end_time: 结束时间 Returns: 降采样后的Series """ # 创建4秒间隔的时间索引 time_index = pd.date_range(start=start_time, end=end_time, freq=f'{SAMPLE_INTERVAL_SECONDS}s') if series.empty: # 返回空序列 return pd.Series(np.nan, index=time_index) # 重采样:使用均值聚合 resampled = series.resample(f'{SAMPLE_INTERVAL_SECONDS}s').mean() # 重新索引到标准时间点 resampled = resampled.reindex(time_index, method='nearest', tolerance=pd.Timedelta(seconds=SAMPLE_INTERVAL_SECONDS)) # 前向填充缺失值 resampled = resampled.ffill().bfill() return resampled def query_all_points_history(plc_points: list, duration_minutes: int = DURATION_MINUTES) -> pd.DataFrame: """ 逐个查询所有PLC点位的历史数据,降采样后合并为DataFrame 异常处理: - 查询失败的点位填充NaN,后续通过ffill/bfill处理 Args: plc_points: PLC点位列表 duration_minutes: 查询时长(分钟),默认40分钟 Returns: DataFrame,第一列为时间索引,后续为PLC点位列 """ # 多查1分钟确保时间跨度 >= 40分钟 query_duration = duration_minutes + 1 # 41分钟 end_time = datetime.now() start_time = end_time - timedelta(minutes=query_duration) logger.info(f"查询时间范围: {start_time.strftime('%Y-%m-%d %H:%M:%S')} 至 {end_time.strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"查询时长: {query_duration}分钟 (确保>=40分钟)") logger.info(f"开始逐个查询 {len(plc_points)} 个点位...") # 创建标准时间索引(基于查询时长) time_index = pd.date_range(start=start_time, end=end_time, freq=f'{SAMPLE_INTERVAL_SECONDS}s') num_rows = len(time_index) # 使用字典收集所有列数据,避免DataFrame碎片化 data_dict = {'index': time_index} success_count = 0 fail_count = 0 for i, point in enumerate(plc_points): # 进度显示 if (i + 1) % 10 == 0 or i == 0: logger.info(f"查询进度: {i + 1}/{len(plc_points)} ({(i+1)/len(plc_points)*100:.1f}%)") # 查询单个点位 raw_series = query_single_point_history(point, start_time, end_time) if not raw_series.empty: # 查询成功:降采样到4秒间隔 resampled = resample_to_4s(raw_series, start_time, end_time) # 对齐到标准时间索引 if len(resampled) >= num_rows: point_data = resampled.values[:num_rows] else: point_data = np.full(num_rows, np.nan) point_data[:len(resampled)] = resampled.values data_dict[point] = point_data success_count += 1 else: # 查询失败:直接填充0 data_dict[point] = np.zeros(num_rows) fail_count += 1 logger.warning(f"点位 {point} 查询失败,填充0") logger.info(f"查询完成: 成功 {success_count}/{len(plc_points)}, 失败 {fail_count} 个") # 一次性创建DataFrame result_df = pd.DataFrame(data_dict) logger.info(f"最终DataFrame大小: {result_df.shape}") return result_df def run_diagnosis(df: pd.DataFrame) -> dict: """ 调用WaterPlantDiagnoser进行诊断 Args: df: 包含时间列的传感器数据DataFrame(600行×159列,含时间列) Returns: 诊断结果字典 """ # 延迟导入,避免循环依赖 from test import WaterPlantDiagnoser try: logger.info("初始化诊断器...") diagnoser = WaterPlantDiagnoser() logger.info(f"开始诊断,输入数据: {df.shape}") result = diagnoser.api_predict(df) return result except FileNotFoundError as e: logger.error(f"模型文件缺失: {e}") return {"status": "error", "message": str(e)} except Exception as e: logger.error(f"诊断过程出错: {e}") import traceback traceback.print_exc() return {"status": "error", "message": str(e)} def append_result_to_file(result: dict, output_file: str): """ 将诊断结果追加写入到输出文件 Args: result: 诊断结果字典 output_file: 输出文件路径 """ # 添加时间戳 result_with_time = { "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), **result } # 将结果转换为JSON字符串 result_str = json.dumps(result_with_time, ensure_ascii=False) # 追加写入文件 with open(output_file, 'a', encoding='utf-8') as f: f.write(result_str + '\n') logger.info(f"结果已追加到: {output_file}") def main(): """ 主函数:执行完整的诊断流程 """ logger.info("=" * 60) logger.info(" 水厂异常诊断系统 - 实时诊断") logger.info("=" * 60) # 文件路径 input_file = os.path.join(BASE_DIR, "input_format.txt") output_file = os.path.join(BASE_DIR, "output_format.txt") # 数据保存目录 data_save_dir = os.path.join(BASE_DIR, "data_history") os.makedirs(data_save_dir, exist_ok=True) # 步骤1: 加载PLC点位列表 if not os.path.exists(input_file): logger.error(f"点位配置文件不存在: {input_file}") return plc_points = load_plc_points(input_file) if not plc_points: logger.error("未能加载任何PLC点位") return # 步骤2: 逐个查询40分钟历史数据,降采样到4秒间隔 df = query_all_points_history(plc_points, duration_minutes=DURATION_MINUTES) if df.empty or len(df) < 100: result = {"status": "error", "message": "无法获取足够的历史数据"} append_result_to_file(result, output_file) return # 步骤3: 保存原始数据到文件(带时间戳,保留2位小数) timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S') data_filename = f"plc_data_{timestamp_str}.csv" data_filepath = os.path.join(data_save_dir, data_filename) # 数值列保留2位小数 df_to_save = df.copy() for col in df_to_save.columns: if col != 'index' and df_to_save[col].dtype in ['float64', 'float32']: df_to_save[col] = df_to_save[col].round(2) df_to_save.to_csv(data_filepath, index=False, encoding='utf-8') logger.info(f"原始数据已保存到: {data_filepath}") # 步骤4: 执行诊断 result = run_diagnosis(df) logger.info(f"诊断结果: {result}") # 步骤5: 追加结果到输出文件 append_result_to_file(result, output_file) logger.info("诊断完成") def run_loop(interval_minutes: int = 40): """ 循环运行模式:每隔指定分钟自动执行一次诊断 Args: interval_minutes: 运行间隔(分钟),默认40分钟 """ import time logger.info("=" * 60) logger.info(f" 水厂异常诊断系统 - 循环运行模式") logger.info(f" 运行间隔: 每 {interval_minutes} 分钟执行一次") logger.info("=" * 60) run_count = 0 while True: try: run_count += 1 logger.info(f"\n{'='*40}") logger.info(f"开始第 {run_count} 轮诊断...") logger.info(f"{'='*40}") # 执行一次诊断 main() # 计算下一次运行时间 next_run_time = datetime.now() + timedelta(minutes=interval_minutes) logger.info(f"本轮完成,下一轮将在 {next_run_time.strftime('%Y-%m-%d %H:%M:%S')} 开始") logger.info(f"等待 {interval_minutes} 分钟...") # 等待指定时间 time.sleep(interval_minutes * 60) except KeyboardInterrupt: logger.info("\n收到中断信号,停止循环运行") break except Exception as e: logger.error(f"诊断过程出错: {e}") import traceback traceback.print_exc() # 出错后继续等待下一轮 logger.info(f"等待 {interval_minutes} 分钟后重试...") time.sleep(interval_minutes * 60) # ===================== 本地模拟测试函数 ===================== def test_with_local_data(): """ 使用本地CSV文件进行测试(不依赖API) 用于在没有API连接时测试诊断逻辑 """ logger.info("=" * 60) logger.info(" 水厂异常诊断系统 - 本地数据测试") logger.info("=" * 60) output_file = os.path.join(BASE_DIR, "output_format.txt") # 尝试读取本地测试数据 test_file = os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}1.csv") if not os.path.exists(test_file): logger.error(f"测试数据文件不存在: {test_file}") result = {"status": "error", "message": f"测试文件不存在: {test_file}"} append_result_to_file(result, output_file) return # 读取数据(40分钟 / 4秒采样间隔 = 600条,多读一些确保足够) logger.info(f"读取测试数据: {test_file}") df = pd.read_csv(test_file) # 取前900条数据确保超过40分钟 if len(df) > 900: df = df.iloc[:900] logger.info(f"测试数据大小: {df.shape} (预期至少600行)") # 执行诊断 result = run_diagnosis(df) logger.info(f"诊断结果: {result}") # 追加结果到输出文件 append_result_to_file(result, output_file) logger.info("本地测试完成") if __name__ == "__main__": import sys if len(sys.argv) > 1: if sys.argv[1] == "--local": # 使用本地数据测试 test_with_local_data() elif sys.argv[1] == "--once": # 单次运行 main() elif sys.argv[1] == "--help" or sys.argv[1] == "-h": print("用法:") print(" python run_diagnosis.py # 默认循环运行(每40分钟一次)") print(" python run_diagnosis.py 30 # 循环运行(每30分钟一次)") print(" python run_diagnosis.py --once # 单次运行") print(" python run_diagnosis.py --local # 本地数据测试") else: # 尝试解析为间隔分钟数 try: interval = int(sys.argv[1]) run_loop(interval_minutes=interval) except ValueError: print(f"未知参数: {sys.argv[1]}") print("使用 --help 查看用法") else: # 默认:循环运行模式(40分钟间隔) run_loop(interval_minutes=40)