# 标准库导入 import time import json import os import statistics import threading import hashlib from datetime import datetime, timedelta import logging from logging.handlers import RotatingFileHandler # 第三方库导入 import pymysql import requests # 自定义模块导入 from DQN_env import UFParams from DQN_decide import run_uf_DQN_decide, generate_plc_instructions, calc_uf_cycle_metrics # 日志系统配置 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # 日志输出格式 formatter = logging.Formatter( '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器,单个文件最大5MB,保留3个备份 file_handler = RotatingFileHandler('monitor_service.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8') file_handler.setFormatter(formatter) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) # 添加处理器 logger.addHandler(file_handler) logger.addHandler(console_handler) # 配置加载函数 def load_config(config_file='config.json'): """ 从JSON配置文件加载系统配置 参数: config_file: 配置文件路径 返回: 配置字典 异常: 配置文件不存在或格式错误时抛出异常 """ try: with open(config_file, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: logger.critical(f"配置文件未找到 {config_file}") raise except json.JSONDecodeError as e: logger.critical(f"配置文件格式错误 {config_file}: {e}") raise def get_current_config(): """ 获取当前配置,支持运行时配置动态变更 """ return load_config() # 初始化配置 config = load_config() # 全局配置参数 # API接口配置 API_BASE_URL = config['api']['base_url'] API_URL = API_BASE_URL + config['api']['current_data_endpoint'] CALLBACK_URL = API_BASE_URL + config['api']['callback_endpoint'] PLC_URL = API_BASE_URL + config['api']['plc_endpoint'] # HTTP请求头 HEADERS = { "Content-Type": "application/json", "JWT-TOKEN": config['api']['jwt_token'] } # MySQL数据库配置,优先读取环境变量 DB_USER = os.getenv('DB_USERNAME', config['database']['user']) DB_PASSWORD = os.getenv('DB_PASSWORD', config['database']['password']) DB_HOST = os.getenv('DB_HOST', config['database']['host']) DB_NAME = os.getenv('DB_DATABASE', config['database']['database']) DB_PORT = int(os.getenv('DB_PORT', str(config['database']['port']))) HISTORY_TABLE_NAME = config['database']['table_name'] # 超滤系统参数 uf_params = UFParams() PROJECT_ID_FOR_CALLBACK = config['scada']['project_id'] SCADA_SECRET = config['scada']['secret'] # 监控流程参数 TRIGGER_VALUE = config['system']['trigger_value'] NUM_VALUES_TO_COLLECT = config['system']['num_values_to_collect'] POLL_INTERVAL = config['system']['poll_interval'] # 设备列表 DEVICE_SEQUENCE = config['devices'] # 状态持久化配置 STATE_FILE = 'device_states.json' _state_lock = threading.Lock() device_states = {} DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" # 状态持久化函数 def load_device_states(): """ 从状态文件加载所有设备的运行状态 """ global device_states with _state_lock: try: if os.path.exists(STATE_FILE): with open(STATE_FILE, 'r', encoding='utf-8') as f: content = f.read() if content: device_states = json.loads(content) logger.info(f"状态文件加载成功 {STATE_FILE}") else: logger.warning(f"状态文件为空 {STATE_FILE}") device_states = {} else: logger.info(f"状态文件不存在,首次运行 {STATE_FILE}") device_states = {} except (json.JSONDecodeError, IOError) as e: logger.error(f"状态文件加载失败 {STATE_FILE}: {e}") device_states = {} def save_device_state(device_name, state_data): """ 保存单个设备的运行状态到文件 参数: device_name: 设备名称 state_data: 设备状态数据字典 """ with _state_lock: try: # 读取现有状态 full_states = {} if os.path.exists(STATE_FILE): with open(STATE_FILE, 'r', encoding='utf-8') as f: content = f.read() if content: full_states = json.loads(content) # 更新指定设备状态 full_states[device_name] = state_data # 写回文件 with open(STATE_FILE, 'w', encoding='utf-8') as f: json.dump(full_states, f, indent=4, ensure_ascii=False) # 更新内存缓存 global device_states device_states[device_name] = state_data logger.info(f"[{device_name}] 状态保存成功") except (json.JSONDecodeError, IOError) as e: logger.error(f"[{device_name}] 状态保存失败: {e}") # 核心业务函数 def create_db_connection(): """ 创建MySQL数据库连接 返回: 连接对象或None """ try: connection = pymysql.connect( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME, port=DB_PORT, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) logger.debug("数据库连接成功") return connection except pymysql.MySQLError as e: logger.error(f"数据库连接失败: {e}") return None def get_tmp_extremes(item_name, start_time, end_time, word_control): """ 查询历史数据中指定时间范围内的跨膜压差极值 参数: item_name: 数据项名称 start_time: 开始时间 end_time: 结束时间 word_control: 控制字段名 返回: (最大值, 最小值) 或 (None, None) """ start_time_str = start_time.strftime(DATETIME_FORMAT) end_time_str = end_time.strftime(DATETIME_FORMAT) query = f""" SELECT MAX(val) AS max_val, MIN(val) AS min_val FROM {HISTORY_TABLE_NAME} WHERE project_id = %s AND item_name = %s AND h_time IN ( SELECT h_time FROM {HISTORY_TABLE_NAME} WHERE project_id = %s AND item_name = %s AND val = 26 AND h_time BETWEEN %s AND %s ) """ logger.info(f"查询历史极值 {item_name} 从 {start_time_str} 到 {end_time_str}") logger.debug(query) db_connection = create_db_connection() if not db_connection: return None, None try: with db_connection.cursor() as cursor: cursor.execute(query, (PROJECT_ID_FOR_CALLBACK, item_name, PROJECT_ID_FOR_CALLBACK, word_control, start_time_str, end_time_str)) result = cursor.fetchone() logger.debug(f"查询结果: {result}") if result and result['max_val'] is not None and result['min_val'] is not None: max_val = float(result['max_val']) min_val = float(result['min_val']) logger.info(f"查询成功 最大值={max_val} 最小值={min_val}") return max_val, min_val else: logger.warning("查询未返回有效数据") return None, None except pymysql.MySQLError as e: logger.error(f"数据库查询错误: {e}") return None, None finally: if db_connection and db_connection.open: db_connection.close() def generate_md5_signature(record_data, secret, timestamp): """ 生成PLC请求的MD5签名 """ cal_str = f"{record_data}{secret}{timestamp}" return hashlib.md5(cal_str.encode('utf-8')).hexdigest().upper() def send_plc_update(device_name, item, old_value, new_value, command_type): """ 向PLC发送参数更新指令 参数: device_name: 设备名称 item: 参数项名称 old_value: 旧值 new_value: 新值 command_type: 指令类型 返回: 是否发送成功 """ # 构造签名和请求数据 timestamp = int(time.time()) # 生成时间戳 record_obj = { "project_id": PROJECT_ID_FOR_CALLBACK, # 项目ID "item": item, # 参数项名称 "old_value": old_value, # 旧值 "new_value": new_value, # 新值 "command_type": command_type # 指令类型 } record_data = json.dumps([record_obj]) # 生成签名数据 signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp) # 生成签名 url = f"{PLC_URL}?sign={signature}×tamp={timestamp}" # 生成请求URL payload = [record_obj] logger.info(f"[{device_name}] PLC指令 {item} 从 {old_value} 到 {new_value}") logger.debug(f"[{device_name}] 签名数据 {record_data}") logger.debug(f"[{device_name}] 签名值 {signature}") # 重试机制 max_retries, retry_interval = 3, 60 # 重试次数 重试间隔 for attempt in range(1, max_retries + 1): try: logger.info(f"[{device_name}] 发送PLC指令 尝试 {attempt}/{max_retries}") response = requests.post(url, json=payload, timeout=15) # 发送PLC指令 请求头 请求体 超时时间 response_json = response.json() if response_json.get('code') == 200: logger.info(f"[{device_name}] PLC指令发送成功 响应 {response_json}") return True else: logger.error(f"[{device_name}] PLC指令发送失败 {response_json.get('msg', '未知错误')}") except requests.exceptions.RequestException as e: logger.error(f"[{device_name}] PLC指令网络错误 {e}") except Exception as e: logger.error(f"[{device_name}] PLC指令未知错误 {e}") if attempt < max_retries: # 重试次数 小于 最大重试次数 logger.info(f"[{device_name}] 等待{retry_interval}秒后重试") time.sleep(retry_interval) logger.error(f"[{device_name}] PLC指令发送失败,已达最大重试次数") return False def send_decision_to_callback(type_name, **kwargs): """ 发送决策结果到回调接口 参数: type_name: 设备类型名称 **kwargs: 决策结果数据 返回: 是否发送成功 """ payload = {"list": [{"type": type_name, "project_id": PROJECT_ID_FOR_CALLBACK, **kwargs}]} # 请求负载 设备类型 项目ID 决策结果数据 logger.info(f"[{type_name}] 发送决策数据\n{json.dumps(payload, indent=2, ensure_ascii=False)}") max_retries, retry_interval = 3, 60 # 重试次数 重试间隔 for attempt in range(1, max_retries + 1): try: logger.info(f"[{type_name}] 发送回调 尝试 {attempt}/{max_retries}") response = requests.post(CALLBACK_URL, headers=HEADERS, json=payload, timeout=15) # 发送回调 请求头 请求体 超时时间 response.raise_for_status() logger.info(f"[{type_name}] 回调发送成功 响应 {response.text}") return True except requests.exceptions.RequestException as e: logger.error(f"[{type_name}] 回调发送失败 {e}") if attempt < max_retries: # 重试次数 小于 最大重试次数 logger.info(f"[{type_name}] 等待{retry_interval}秒后重试") time.sleep(retry_interval) logger.error(f"[{type_name}] 回调发送失败,已达最大重试次数") return False def get_device_value(payload, device_name): """ 从API获取设备数据项的当前值 参数: payload: 请求负载 device_name: 设备名称 返回: 数据值或None """ try: response = requests.post(API_URL, headers=HEADERS, json=[payload], timeout=10) # 发送请求 请求头 请求体 超时时间 response.raise_for_status() api_response = response.json() # 解析响应 if api_response.get("code") == 200 and api_response.get("data"): val_str = api_response["data"][0].get("val") # 获取数据值 if val_str is not None: return float(val_str) else: logger.error(f"[{device_name}] 获取数据失败 {payload['deviceItems']} {api_response.get('msg', '未知错误')}") # 日志 设备名称 请求负载 响应 except requests.exceptions.RequestException as e: logger.error(f"[{device_name}] API网络错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误 except (json.JSONDecodeError, ValueError, IndexError) as e: logger.error(f"[{device_name}] API数据解析错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误 return None # 设备监控主循环 def monitor_device(device): """ 单个设备的监控循环 完整流程: 1. 等待触发条件 2. 收集稳定数据 3. 执行决策计算 4. 发送控制指令 5. 等待重置信号 参数: device: 设备配置字典 """ name = device["name"] threading.current_thread().name = name logger.info("监控线程启动") # 加载设备历史状态 device_state = device_states.get(name, {}) # 设备状态 model_prev_L_s = device_state.get('model_prev_L_s') # 过滤时间 上一轮 model_prev_t_bw_s = device_state.get('model_prev_t_bw_s') # 反洗时间 上一轮 last_cycle_end_time_str = device_state.get('last_cycle_end_time') # 上次运行结束时间 # 解析上次运行结束时间 last_cycle_end_time = None # 上次运行结束时间 if last_cycle_end_time_str: try: last_cycle_end_time = datetime.strptime(last_cycle_end_time_str, DATETIME_FORMAT) # 上次运行结束时间 logger.info(f"历史状态加载成功,上次运行时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}") except ValueError: logger.warning(f"时间戳解析失败 {last_cycle_end_time_str}") else: logger.info("首次运行,无历史状态") # 主循环 while True: try: # 阶段1: 等待触发条件 logger.info(f"等待触发 控制字需等于 {TRIGGER_VALUE}") while True: control_value = get_device_value(device["control_payload"], name) # 控制字 if control_value is not None and int(control_value) == TRIGGER_VALUE: # 控制字 等于 触发值 logger.info("触发条件满足") break time.sleep(POLL_INTERVAL) # 阶段2: 收集数据 logger.info(f"开始收集TMP数据 需要 {NUM_VALUES_TO_COLLECT} 个有效数据点") collected_values = [] last_known_value = get_device_value(device["target_payload"], name) # 上次已知值 if last_known_value is not None: logger.info(f"TMP基准值 {last_known_value}") # 循环收集数据点,直到达到目标数量 while len(collected_values) < NUM_VALUES_TO_COLLECT: # 收集数据点 直到达到目标数量 current_value = get_device_value(device["target_payload"], name) # 当前值 if current_value is None: time.sleep(POLL_INTERVAL) continue # 只有当数值发生变化时才记录 if current_value != last_known_value: # 当前值 不等于 上次已知值 collected_values.append(current_value) logger.info(f"TMP变化 {last_known_value:.4f} 到 {current_value:.4f} 已收集 {len(collected_values)}/{NUM_VALUES_TO_COLLECT}") last_known_value = current_value time.sleep(POLL_INTERVAL) else: logger.warning("无法获取TMP基准值,跳过本轮") continue # 阶段3: 决策计算 logger.info("数据收集完成,开始决策计算") if collected_values: # 计算中位数作为代表值 median_value = statistics.median(sorted(collected_values)) logger.info(f"TMP中位数 {median_value:.4f}") # 确定历史数据查询时间范围 current_decision_time = datetime.now() start_query_time = last_cycle_end_time if last_cycle_end_time else current_decision_time - timedelta(hours=48) _word_controldevice = device["control_payload"]["deviceItems"] # 查询历史极值 max_tmp, min_tmp = get_tmp_extremes(device["press_pv_item"], start_query_time, current_decision_time, _word_controldevice) # 调用DQN模型获取决策建议 logger.info("调用DQN决策模型") uf_bw_dict = run_uf_DQN_decide(uf_params, median_value) logger.info(f"模型决策结果 {uf_bw_dict}") # 获取当前PLC参数 prod_time = get_device_value(device["production_time_payload"], name) or 3800 # 产水时间 默认3800 bw_time = get_device_value(device["backwashing_payload"], name) or 100 # 反洗时间 默认100 # 生成PLC指令 L_s, t_bw_s = generate_plc_instructions( prod_time, bw_time, # 产水时间 反洗时间 model_prev_L_s, model_prev_t_bw_s, # 过滤时间 反洗时间 上一轮 uf_bw_dict["L_s"], uf_bw_dict["t_bw_s"] # 过滤时间 反洗时间 决策建议 ) # 计算运行指标 logger.info(f"计算运行指标 TMP={median_value} L_s={L_s} t_bw_s={t_bw_s}") metrics = calc_uf_cycle_metrics(uf_params, median_value, max_tmp, min_tmp, L_s, t_bw_s) # 计算运行指标 # 发送决策结果 send_decision_to_callback( type_name=name, # 设备名称 water_production_time=int(L_s), # 过滤时间 physical_backwash=int(t_bw_s), # 反洗时间 ceb_backwash_frequency=int(metrics["k_bw_per_ceb"]), # 化学反洗频率 duration_system=int(prod_time), # 系统运行时间 tmp_action=median_value, # TMP动作 recovery_rate=metrics["recovery"], # 回收率 ton_water_energy_kWh=metrics['ton_water_energy_kWh_per_m3'], # 吨水电耗 max_permeability=metrics['max_permeability'], # 最高渗透率 daily_prod_time_h=metrics['daily_prod_time_h'], # 日均产水时间 ctime=current_decision_time.strftime(DATETIME_FORMAT) # 时间 ) # 判断是否下发PLC指令 if get_current_config()['system']['use_model'] == 1: logger.info("模型开关已开启,下发PLC指令") send_plc_update(name, device["production_time_payload"]["deviceItems"], str(prod_time), str(int(L_s)), 1) # 过滤时间 send_plc_update(name, device["backwashing_payload"]["deviceItems"], str(bw_time), str(int(t_bw_s)), 2) # 反洗时间 else: logger.info("模型开关已关闭,跳过PLC指令") # 保存运行状态 model_prev_L_s = L_s # 过滤时间 上一轮 model_prev_t_bw_s = t_bw_s # 反洗时间 上一轮 last_cycle_end_time = current_decision_time # 上次运行结束时间 state_to_save = { 'model_prev_L_s': model_prev_L_s, # 过滤时间 上一轮 'model_prev_t_bw_s': model_prev_t_bw_s, # 反洗时间 上一轮 'last_cycle_end_time': last_cycle_end_time.strftime(DATETIME_FORMAT) # 上次运行结束时间 } save_device_state(name, state_to_save) # 保存设备状态 logger.info(f"状态保存完成 下次查询起始时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}") # 阶段4: 等待重置 logger.info(f"等待重置 控制字需不等于 {TRIGGER_VALUE}") while True: control_value = get_device_value(device["control_payload"], name) # 控制字 if control_value is None or int(control_value) != TRIGGER_VALUE: # 控制字 不等于 触发值 logger.info("重置条件满足,开始新一轮") break time.sleep(POLL_INTERVAL) logger.info(f"{name} 本轮完成\n") except Exception as e: logger.critical(f"监控循环异常 {e}", exc_info=True) logger.info("等待60秒后重试") time.sleep(60) # 程序主入口 def main(): """ 主函数 功能: 1. 加载设备历史状态 2. 为每个设备启动独立监控线程 3. 保持主线程运行 """ logger.info("========================================") logger.info("超滤并行监控服务启动") logger.info("========================================") # 加载设备历史状态 load_device_states() # 为每个设备创建监控线程 threads = [] for device_config in DEVICE_SEQUENCE: thread = threading.Thread(target=monitor_device, args=(device_config,), daemon=True) threads.append(thread) thread.start() logger.info(f"设备 {device_config['name']} 监控线程已启动") # 保持主线程运行 try: while any(t.is_alive() for t in threads): time.sleep(1) except KeyboardInterrupt: logger.info("检测到中断信号,程序退出") if __name__ == "__main__": main()