# 标准库导入 import time import json import os import threading import hashlib from datetime import datetime, timedelta import logging from logging.handlers import RotatingFileHandler # 第三方库导入 import pymysql import requests # 自定义模块导入 from DQN_env import UFParams from DQN_decide import run_uf_DQN_decide, generate_plc_instructions, calc_uf_cycle_metrics # 日志系统配置 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # 日志输出格式 formatter = logging.Formatter( '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器,单个文件最大5MB,保留3个备份 file_handler = RotatingFileHandler('monitor_service.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8') file_handler.setFormatter(formatter) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) # 添加处理器 logger.addHandler(file_handler) logger.addHandler(console_handler) # 配置加载函数 def load_config(config_file='config.json'): """ 从JSON配置文件加载系统配置 参数: config_file: 配置文件路径 返回: 配置字典 异常: 配置文件不存在或格式错误时抛出异常 """ try: with open(config_file, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: logger.critical(f"配置文件未找到 {config_file}") raise except json.JSONDecodeError as e: logger.critical(f"配置文件格式错误 {config_file}: {e}") raise def get_current_config(): """ 获取当前配置,支持运行时配置动态变更 """ return load_config() # 初始化配置 config = load_config() # 全局配置参数 # API接口配置 API_BASE_URL = config['api']['base_url'] API_URL = API_BASE_URL + config['api']['current_data_endpoint'] CALLBACK_URL = API_BASE_URL + config['api']['callback_endpoint'] PLC_URL = API_BASE_URL + config['api']['plc_endpoint'] # HTTP请求头 HEADERS = { "Content-Type": "application/json", "JWT-TOKEN": config['api']['jwt_token'] } # MySQL数据库配置,优先读取环境变量 DB_USER = os.getenv('DB_USERNAME', config['database']['user']) DB_PASSWORD = os.getenv('DB_PASSWORD', config['database']['password']) DB_HOST = os.getenv('DB_HOST', config['database']['host']) DB_NAME = os.getenv('DB_DATABASE', config['database']['database']) DB_PORT = int(os.getenv('DB_PORT', str(config['database']['port']))) HISTORY_TABLE_NAME = config['database']['table_name'] # 超滤系统参数 uf_params = UFParams() PROJECT_ID_FOR_CALLBACK = config['scada']['project_id'] SCADA_SECRET = config['scada']['secret'] # 监控流程参数 TRIGGER_VALUE = config['system']['trigger_value'] NUM_VALUES_TO_COLLECT = config['system']['num_values_to_collect'] POLL_INTERVAL = config['system']['poll_interval'] # 设备列表 DEVICE_SEQUENCE = config['devices'] # 状态持久化配置 STATE_FILE = 'device_states.json' _state_lock = threading.Lock() device_states = {} DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" # 状态持久化函数 def load_device_states(): """ 从状态文件加载所有设备的运行状态 """ global device_states with _state_lock: try: if os.path.exists(STATE_FILE): with open(STATE_FILE, 'r', encoding='utf-8') as f: content = f.read() if content: device_states = json.loads(content) logger.info(f"状态文件加载成功 {STATE_FILE}") else: logger.warning(f"状态文件为空 {STATE_FILE}") device_states = {} else: logger.info(f"状态文件不存在,首次运行 {STATE_FILE}") device_states = {} except (json.JSONDecodeError, IOError) as e: logger.error(f"状态文件加载失败 {STATE_FILE}: {e}") device_states = {} def save_device_state(device_name, state_data): """ 保存单个设备的运行状态到文件 参数: device_name: 设备名称 state_data: 设备状态数据字典 """ with _state_lock: try: # 读取现有状态 full_states = {} if os.path.exists(STATE_FILE): with open(STATE_FILE, 'r', encoding='utf-8') as f: content = f.read() if content: full_states = json.loads(content) # 更新指定设备状态 full_states[device_name] = state_data # 写回文件 with open(STATE_FILE, 'w', encoding='utf-8') as f: json.dump(full_states, f, indent=4, ensure_ascii=False) # 更新内存缓存 global device_states device_states[device_name] = state_data logger.info(f"[{device_name}] 状态保存成功") except (json.JSONDecodeError, IOError) as e: logger.error(f"[{device_name}] 状态保存失败: {e}") # 核心业务函数 def create_db_connection(): """ 创建MySQL数据库连接 返回: 连接对象或None """ try: connection = pymysql.connect( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME, port=DB_PORT, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) logger.debug("数据库连接成功") return connection except pymysql.MySQLError as e: logger.error(f"数据库连接失败: {e}") return None def get_tmp_extremes(item_name, start_time, end_time, word_control): """ 通过API查询历史数据中指定时间范围内的跨膜压差极值 参数: item_name: 数据项名称 start_time: 开始时间 end_time: 结束时间 word_control: 控制字段名 返回: (最大值, 最小值) 或 (None, None) """ # 转换时间为毫秒级时间戳 start_timestamp = int(start_time.timestamp() * 1000) end_timestamp = int(end_time.timestamp() * 1000) logger.info(f"查询历史极值 {item_name} 从 {start_time.strftime(DATETIME_FORMAT)} 到 {end_time.strftime(DATETIME_FORMAT)}") # API基础URL api_base_url = "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data" try: # 第一次调用:查询item_name的极值 params1 = { "deviceid": "1", "dataitemid": item_name, "project_id": "92", "stime": start_timestamp, "etime": end_timestamp, "size": "1", "interval": "minute", "aggregator": "new" } logger.info(f"第一次API调用: {api_base_url} 参数: {params1}") response1 = requests.get(api_base_url, params=params1, headers=HEADERS, timeout=30) response1.raise_for_status() data1 = response1.json() logger.debug(f"第一次API响应: {data1}") # 第二次调用:查询word_control的极值 params2 = { "deviceid": "1", "dataitemid": word_control, "project_id": "92", "stime": start_timestamp, "etime": end_timestamp, "size": "1", "interval": "minute", "aggregator": "new" } logger.info(f"第二次API调用: {api_base_url} 参数: {params2}") response2 = requests.get(api_base_url, params=params2, headers=HEADERS, timeout=30) response2.raise_for_status() data2 = response2.json() logger.debug(f"第二次API响应: {data2}") # 处理两次API调用的结果 max_val = None min_val = None # 从第一次调用结果中提取'UF1跨膜压差'的值,并存储在字典中,以时间为键 uf1_diff_values = {} if data1.get("code") == 200 and data1.get("data"): for item in data1["data"]: if item.get("name") == "UF1跨膜压差" and item.get("val") is not None: time = item.get("htime_at") uf1_diff_values[time] = float(item.get("val")) if uf1_diff_values: logger.info(f"第一次API查询成功,提取到跨膜压差数据:{uf1_diff_values}") # 从第二次调用结果中提取'UF1控制字'为26的数据点,并进行时间匹配 if data2.get("code") == 200 and data2.get("data"): control_26_values = [] for item in data2["data"]: if item.get("name") == "UF1控制字" and item.get("val") == '26': time = item.get("htime_at") # 如果在第一次数据中找到了对应的跨膜压差值 if time in uf1_diff_values: control_26_values.append(uf1_diff_values[time]) if control_26_values: logger.info(f"找到控制字为26的数据点,合并跨膜压差数据") max_val = max(control_26_values) min_val = min(control_26_values) # 增加最小跨膜压差的下限值 if min_val < 0.01: min_val = 0.01 logger.info(f"控制字为26时的最大跨膜压差值={max_val},最小跨膜压差值={min_val}") if max_val is not None and min_val is not None: logger.info(f"API查询成功 最大跨膜压差值={max_val} 最小跨膜压差值={min_val}") return max_val, min_val else: logger.warning("未找到有效的控制字为26时的跨膜压差数据") return None, None except requests.exceptions.RequestException as e: logger.error(f"API请求错误: {e}") return None, None except (json.JSONDecodeError, ValueError, KeyError) as e: logger.error(f"API响应解析错误: {e}") return None, None except Exception as e: logger.error(f"API查询未知错误: {e}") return None, None def generate_md5_signature(record_data, secret, timestamp): """ 生成PLC请求的MD5签名 """ cal_str = f"{record_data}{secret}{timestamp}" return hashlib.md5(cal_str.encode('utf-8')).hexdigest().upper() def send_plc_update(device_name, item, old_value, new_value, command_type): """ 向PLC发送参数更新指令 参数: device_name: 设备名称 item: 参数项名称 old_value: 旧值 new_value: 新值 command_type: 指令类型 返回: 是否发送成功 """ # 构造签名和请求数据 timestamp = int(time.time()) # 生成时间戳 record_obj = { "project_id": PROJECT_ID_FOR_CALLBACK, # 项目ID "item": item, # 参数项名称 "old_value": old_value, # 旧值 "new_value": new_value, # 新值 "command_type": command_type # 指令类型 } record_data = json.dumps([record_obj]) # 生成签名数据 signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp) # 生成签名 url = f"{PLC_URL}?sign={signature}×tamp={timestamp}" # 生成请求URL payload = [record_obj] logger.info(f"[{device_name}] PLC指令 {item} 从 {old_value} 到 {new_value}") logger.debug(f"[{device_name}] 签名数据 {record_data}") logger.debug(f"[{device_name}] 签名值 {signature}") # 重试机制 max_retries, retry_interval = 3, 60 # 重试次数 重试间隔 for attempt in range(1, max_retries + 1): try: logger.info(f"[{device_name}] 发送PLC指令 尝试 {attempt}/{max_retries}") response = requests.post(url, json=payload, timeout=15) # 发送PLC指令 请求头 请求体 超时时间 response_json = response.json() if response_json.get('code') == 200: logger.info(f"[{device_name}] PLC指令发送成功 响应 {response_json}") return True else: logger.error(f"[{device_name}] PLC指令发送失败 {response_json.get('msg', '未知错误')}") except requests.exceptions.RequestException as e: logger.error(f"[{device_name}] PLC指令网络错误 {e}") except Exception as e: logger.error(f"[{device_name}] PLC指令未知错误 {e}") if attempt < max_retries: # 重试次数 小于 最大重试次数 logger.info(f"[{device_name}] 等待{retry_interval}秒后重试") time.sleep(retry_interval) logger.error(f"[{device_name}] PLC指令发送失败,已达最大重试次数") return False def send_decision_to_callback(type_name, **kwargs): """ 发送决策结果到回调接口 参数: type_name: 设备类型名称 **kwargs: 决策结果数据 返回: 是否发送成功 """ payload = {"list": [{"type": type_name, "project_id": PROJECT_ID_FOR_CALLBACK, **kwargs}]} # 请求负载 设备类型 项目ID 决策结果数据 logger.info(f"[{type_name}] 发送决策数据\n{json.dumps(payload, indent=2, ensure_ascii=False)}") max_retries, retry_interval = 3, 60 # 重试次数 重试间隔 for attempt in range(1, max_retries + 1): try: logger.info(f"[{type_name}] 发送回调 尝试 {attempt}/{max_retries}") response = requests.post(CALLBACK_URL, headers=HEADERS, json=payload, timeout=15) # 发送回调 请求头 请求体 超时时间 response.raise_for_status() logger.info(f"[{type_name}] 回调发送成功 响应 {response.text}") return True except requests.exceptions.RequestException as e: logger.error(f"[{type_name}] 回调发送失败 {e}") if attempt < max_retries: # 重试次数 小于 最大重试次数 logger.info(f"[{type_name}] 等待{retry_interval}秒后重试") time.sleep(retry_interval) logger.error(f"[{type_name}] 回调发送失败,已达最大重试次数") return False def get_device_value(payload, device_name): """ 从API获取设备数据项的当前值 参数: payload: 请求负载 device_name: 设备名称 返回: 数据值或None """ try: response = requests.post(API_URL, headers=HEADERS, json=[payload], timeout=10) # 发送请求 请求头 请求体 超时时间 response.raise_for_status() api_response = response.json() # 解析响应 if api_response.get("code") == 200 and api_response.get("data"): val_str = api_response["data"][0].get("val") # 获取数据值 if val_str is not None: return float(val_str) else: logger.error(f"[{device_name}] 获取数据失败 {payload['deviceItems']} {api_response.get('msg', '未知错误')}") # 日志 设备名称 请求负载 响应 except requests.exceptions.RequestException as e: logger.error(f"[{device_name}] API网络错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误 except (json.JSONDecodeError, ValueError, IndexError) as e: logger.error(f"[{device_name}] API数据解析错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误 return None # 设备监控主循环 def monitor_device(device): """ 单个设备的监控循环 完整流程: 1. 等待触发条件 2. 收集稳定数据 3. 执行决策计算 4. 发送控制指令 5. 等待重置信号 参数: device: 设备配置字典 """ name = device["name"] threading.current_thread().name = name logger.info("监控线程启动") # 加载设备历史状态 device_state = device_states.get(name, {}) # 设备状态 model_prev_L_s = device_state.get('model_prev_L_s') # 过滤时间 上一轮 model_prev_t_bw_s = device_state.get('model_prev_t_bw_s') # 反洗时间 上一轮 last_cycle_end_time_str = device_state.get('last_cycle_end_time') # 上次运行结束时间 # 解析上次运行结束时间 last_cycle_end_time = None # 上次运行结束时间 if last_cycle_end_time_str: try: last_cycle_end_time = datetime.strptime(last_cycle_end_time_str, DATETIME_FORMAT) # 上次运行结束时间 logger.info(f"历史状态加载成功,上次运行时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}") except ValueError: logger.warning(f"时间戳解析失败 {last_cycle_end_time_str}") else: logger.info("首次运行,无历史状态") # 主循环 while True: try: # 阶段1: 等待触发条件 (控制字=95) logger.info(f"等待触发 控制字需等于 {TRIGGER_VALUE}") while True: control_value = get_device_value(device["control_payload"], name) # 控制字 if control_value is not None and int(control_value) == TRIGGER_VALUE: # 控制字 等于 触发值 95 logger.info("触发条件满足,开始等待控制字变为26") break time.sleep(POLL_INTERVAL) # 阶段1.5: 等待控制字变为26 logger.info("等待控制字变为26") while True: control_value = get_device_value(device["control_payload"], name) # 控制字 if control_value is not None and int(control_value) == 26: # 控制字 等于 26 logger.info("控制字变为26,开始收集10分钟数据") break time.sleep(POLL_INTERVAL) # 阶段2: 收集10分钟数据并计算平均值 logger.info("开始收集10分钟TMP数据") collected_values = [] start_collection_time = datetime.now() collection_duration = timedelta(minutes=10) # 10分钟 while datetime.now() - start_collection_time < collection_duration: current_value = get_device_value(device["target_payload"], name) # 当前值 if current_value is not None: collected_values.append(current_value) logger.info(f"收集TMP值 {current_value:.4f} 已收集 {len(collected_values)} 个数据点") time.sleep(POLL_INTERVAL) if not collected_values: logger.warning("10分钟内未收集到有效数据,跳过本轮") continue # 阶段3: 决策计算 logger.info("数据收集完成,开始决策计算") if collected_values: # 计算平均值作为代表值 average_value = sum(collected_values) / len(collected_values) logger.info(f"TMP平均值 {average_value:.4f}") # 确定历史数据查询时间范围 current_decision_time = datetime.now() start_query_time = last_cycle_end_time if last_cycle_end_time else current_decision_time - timedelta(hours=48) _word_controldevice = device["control_payload"]["deviceItems"] # 查询历史极值 max_tmp, min_tmp = get_tmp_extremes(device["press_pv_item"], start_query_time, current_decision_time, _word_controldevice) # 调用DQN模型获取决策建议 logger.info("调用DQN决策模型") uf_bw_dict = run_uf_DQN_decide(uf_params, average_value) logger.info(f"模型决策结果 {uf_bw_dict}") # 获取当前PLC参数 prod_time = get_device_value(device["production_time_payload"], name) or 3800 # 产水时间 默认3800 bw_time = get_device_value(device["backwashing_payload"], name) or 100 # 反洗时间 默认100 # 生成PLC指令 L_s, t_bw_s = generate_plc_instructions( prod_time, bw_time, # 产水时间 反洗时间 model_prev_L_s, model_prev_t_bw_s, # 过滤时间 反洗时间 上一轮 uf_bw_dict["L_s"], uf_bw_dict["t_bw_s"] # 过滤时间 反洗时间 决策建议 ) # 计算运行指标 logger.info(f"计算运行指标 TMP={average_value} L_s={L_s} t_bw_s={t_bw_s}") metrics = calc_uf_cycle_metrics(uf_params, average_value, max_tmp, min_tmp, L_s, t_bw_s) # 计算运行指标 # 发送决策结果 send_decision_to_callback( type_name=name, # 设备名称 water_production_time=int(L_s), # 过滤时间 physical_backwash=int(t_bw_s), # 反洗时间 ceb_backwash_frequency=int(metrics["k_bw_per_ceb"]), # 化学反洗频率 duration_system=int(prod_time), # 系统运行时间 tmp_action=average_value, # TMP动作 recovery_rate=metrics["recovery"], # 回收率 ton_water_energy_kWh=metrics['ton_water_energy_kWh_per_m3'], # 吨水电耗 max_permeability=metrics['max_permeability'], # 最高渗透率 daily_prod_time_h=metrics['daily_prod_time_h'], # 日均产水时间 ctime=current_decision_time.strftime(DATETIME_FORMAT) # 时间 ) # 判断是否下发PLC指令 if get_current_config()['system']['use_model'] == 1: logger.info("模型开关已开启,下发PLC指令") send_plc_update(name, device["production_time_payload"]["deviceItems"], str(prod_time), str(int(L_s)), 1) # 过滤时间 send_plc_update(name, device["backwashing_payload"]["deviceItems"], str(bw_time), str(int(t_bw_s)), 2) # 反洗时间 else: logger.info("模型开关已关闭,跳过PLC指令") # 保存运行状态 model_prev_L_s = L_s # 过滤时间 上一轮 model_prev_t_bw_s = t_bw_s # 反洗时间 上一轮 last_cycle_end_time = current_decision_time # 上次运行结束时间 state_to_save = { 'model_prev_L_s': model_prev_L_s, # 过滤时间 上一轮 'model_prev_t_bw_s': model_prev_t_bw_s, # 反洗时间 上一轮 'last_cycle_end_time': last_cycle_end_time.strftime(DATETIME_FORMAT) # 上次运行结束时间 } save_device_state(name, state_to_save) # 保存设备状态 logger.info(f"状态保存完成 下次查询起始时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}") # 阶段4: 等待重置 logger.info(f"等待重置 控制字需不等于 {TRIGGER_VALUE}") while True: control_value = get_device_value(device["control_payload"], name) # 控制字 if control_value is None or int(control_value) != TRIGGER_VALUE: # 控制字 不等于 触发值 logger.info("重置条件满足,开始新一轮") break time.sleep(POLL_INTERVAL) logger.info(f"{name} 本轮完成\n") except Exception as e: logger.critical(f"监控循环异常 {e}", exc_info=True) logger.info("等待60秒后重试") time.sleep(60) # 程序主入口 def main(): """ 主函数 功能: 1. 加载设备历史状态 2. 为每个设备启动独立监控线程 3. 保持主线程运行 """ logger.info("========================================") logger.info("超滤并行监控服务启动") logger.info("========================================") # 加载设备历史状态 load_device_states() # 为每个设备创建监控线程 threads = [] for device_config in DEVICE_SEQUENCE: thread = threading.Thread(target=monitor_device, args=(device_config,), daemon=True) threads.append(thread) thread.start() logger.info(f"设备 {device_config['name']} 监控线程已启动") # 保持主线程运行 try: while any(t.is_alive() for t in threads): time.sleep(1) except KeyboardInterrupt: logger.info("检测到中断信号,程序退出") def test_get_tmp_extremes(): """ 测试get_tmp_extremes函数的API调用 """ print("=" * 50) print("测试get_tmp_extremes API调用") print("=" * 50) # 设置测试参数 test_item_name = "C.M.UF1_DB@press_PV" # 测试数据项 test_word_control = "C.M.UF1_DB@word_control" # 测试控制字段 # 设置测试时间范围(最近24小时) end_time = datetime.now() start_time = end_time - timedelta(hours=24) print(f"测试参数:") print(f" 数据项: {test_item_name}") print(f" 控制字段: {test_word_control}") print(f" 开始时间: {start_time.strftime(DATETIME_FORMAT)}") print(f" 结束时间: {end_time.strftime(DATETIME_FORMAT)}") print() try: # 调用函数 max_val, min_val = get_tmp_extremes(test_item_name, start_time, end_time, test_word_control) print("测试结果:") if max_val is not None and min_val is not None: print(f" API调用成功") print(f" 最大值: {max_val}") print(f" 最小值: {min_val}") else: print(f" API调用失败或未返回有效数据") print(f" 最大值: {max_val}") print(f" 最小值: {min_val}") except Exception as e: print(f" 测试过程中发生异常: {e}") print("=" * 50) if __name__ == "__main__": # 运行测试用例 # test_get_tmp_extremes() # 运行主程序 main()