||
- # 标准库导入
- import time
- import json
- import os
- import statistics
- import threading
- import hashlib
- from datetime import datetime, timedelta
- import logging
- from logging.handlers import RotatingFileHandler
- # 第三方库导入
- import pymysql
- import requests
- # 自定义模块导入
- from DQN_env import UFParams
- from DQN_decide import run_uf_DQN_decide, generate_plc_instructions, calc_uf_cycle_metrics
- # 日志系统配置
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- # 日志输出格式
- formatter = logging.Formatter(
- '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- # 文件日志处理器,单个文件最大5MB,保留3个备份
- file_handler = RotatingFileHandler('monitor_service.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
- file_handler.setFormatter(formatter)
- # 控制台日志处理器
- console_handler = logging.StreamHandler()
- console_handler.setFormatter(formatter)
- # 添加处理器
- logger.addHandler(file_handler)
- logger.addHandler(console_handler)
- # 配置加载函数
- def load_config(config_file='config.json'):
- """
- 从JSON配置文件加载系统配置
-
- 参数:
- config_file: 配置文件路径
-
- 返回:
- 配置字典
-
- 异常:
- 配置文件不存在或格式错误时抛出异常
- """
- try:
- with open(config_file, 'r', encoding='utf-8') as f:
- return json.load(f)
- except FileNotFoundError:
- logger.critical(f"配置文件未找到 {config_file}")
- raise
- except json.JSONDecodeError as e:
- logger.critical(f"配置文件格式错误 {config_file}: {e}")
- raise
- def get_current_config():
- """
- 获取当前配置,支持运行时配置动态变更
- """
- return load_config()
- # 初始化配置
- config = load_config()
- # 全局配置参数
- # API接口配置
- API_BASE_URL = config['api']['base_url']
- API_URL = API_BASE_URL + config['api']['current_data_endpoint']
- CALLBACK_URL = API_BASE_URL + config['api']['callback_endpoint']
- PLC_URL = API_BASE_URL + config['api']['plc_endpoint']
- # HTTP请求头
- HEADERS = {
- "Content-Type": "application/json",
- "JWT-TOKEN": config['api']['jwt_token']
- }
- # MySQL数据库配置,优先读取环境变量
- DB_USER = os.getenv('DB_USERNAME', config['database']['user'])
- DB_PASSWORD = os.getenv('DB_PASSWORD', config['database']['password'])
- DB_HOST = os.getenv('DB_HOST', config['database']['host'])
- DB_NAME = os.getenv('DB_DATABASE', config['database']['database'])
- DB_PORT = int(os.getenv('DB_PORT', str(config['database']['port'])))
- HISTORY_TABLE_NAME = config['database']['table_name']
- # 超滤系统参数
- uf_params = UFParams()
- PROJECT_ID_FOR_CALLBACK = config['scada']['project_id']
- SCADA_SECRET = config['scada']['secret']
- # 监控流程参数
- TRIGGER_VALUE = config['system']['trigger_value']
- NUM_VALUES_TO_COLLECT = config['system']['num_values_to_collect']
- POLL_INTERVAL = config['system']['poll_interval']
- # 设备列表
- DEVICE_SEQUENCE = config['devices']
- # 状态持久化配置
- STATE_FILE = 'device_states.json'
- _state_lock = threading.Lock()
- device_states = {}
- DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
- # 状态持久化函数
- def load_device_states():
- """
- 从状态文件加载所有设备的运行状态
- """
- global device_states
- with _state_lock:
- try:
- if os.path.exists(STATE_FILE):
- with open(STATE_FILE, 'r', encoding='utf-8') as f:
- content = f.read()
- if content:
- device_states = json.loads(content)
- logger.info(f"状态文件加载成功 {STATE_FILE}")
- else:
- logger.warning(f"状态文件为空 {STATE_FILE}")
- device_states = {}
- else:
- logger.info(f"状态文件不存在,首次运行 {STATE_FILE}")
- device_states = {}
- except (json.JSONDecodeError, IOError) as e:
- logger.error(f"状态文件加载失败 {STATE_FILE}: {e}")
- device_states = {}
- def save_device_state(device_name, state_data):
- """
- 保存单个设备的运行状态到文件
-
- 参数:
- device_name: 设备名称
- state_data: 设备状态数据字典
- """
- with _state_lock:
- try:
- # 读取现有状态
- full_states = {}
- if os.path.exists(STATE_FILE):
- with open(STATE_FILE, 'r', encoding='utf-8') as f:
- content = f.read()
- if content:
- full_states = json.loads(content)
- # 更新指定设备状态
- full_states[device_name] = state_data
- # 写回文件
- with open(STATE_FILE, 'w', encoding='utf-8') as f:
- json.dump(full_states, f, indent=4, ensure_ascii=False)
- # 更新内存缓存
- global device_states
- device_states[device_name] = state_data
- logger.info(f"[{device_name}] 状态保存成功")
- except (json.JSONDecodeError, IOError) as e:
- logger.error(f"[{device_name}] 状态保存失败: {e}")
- # 核心业务函数
- def create_db_connection():
- """
- 创建MySQL数据库连接
-
- 返回:
- 连接对象或None
- """
- try:
- connection = pymysql.connect(
- host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME,
- port=DB_PORT, charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor
- )
- logger.debug("数据库连接成功")
- return connection
- except pymysql.MySQLError as e:
- logger.error(f"数据库连接失败: {e}")
- return None
- def get_tmp_extremes(item_name, start_time, end_time, word_control):
- """
- 查询历史数据中指定时间范围内的跨膜压差极值
-
- 参数:
- item_name: 数据项名称
- start_time: 开始时间
- end_time: 结束时间
- word_control: 控制字段名
-
- 返回:
- (最大值, 最小值) 或 (None, None)
- """
- start_time_str = start_time.strftime(DATETIME_FORMAT)
- end_time_str = end_time.strftime(DATETIME_FORMAT)
- query = f"""
- SELECT
- MAX(val) AS max_val,
- MIN(val) AS min_val
- FROM {HISTORY_TABLE_NAME}
- WHERE project_id = %s
- AND item_name = %s
- AND h_time IN (
- SELECT h_time
- FROM {HISTORY_TABLE_NAME}
- WHERE project_id = %s
- AND item_name = %s
- AND val = 26
- AND h_time BETWEEN %s AND %s
- )
- """
- logger.info(f"查询历史极值 {item_name} 从 {start_time_str} 到 {end_time_str}")
- logger.debug(query)
-
- db_connection = create_db_connection()
- if not db_connection:
- return None, None
- try:
- with db_connection.cursor() as cursor:
- cursor.execute(query, (PROJECT_ID_FOR_CALLBACK, item_name, PROJECT_ID_FOR_CALLBACK, word_control, start_time_str, end_time_str))
- result = cursor.fetchone()
- logger.debug(f"查询结果: {result}")
- if result and result['max_val'] is not None and result['min_val'] is not None:
- max_val = float(result['max_val'])
- min_val = float(result['min_val'])
- logger.info(f"查询成功 最大值={max_val} 最小值={min_val}")
- return max_val, min_val
- else:
- logger.warning("查询未返回有效数据")
- return None, None
- except pymysql.MySQLError as e:
- logger.error(f"数据库查询错误: {e}")
- return None, None
- finally:
- if db_connection and db_connection.open:
- db_connection.close()
- def generate_md5_signature(record_data, secret, timestamp):
- """
- 生成PLC请求的MD5签名
- """
- cal_str = f"{record_data}{secret}{timestamp}"
- return hashlib.md5(cal_str.encode('utf-8')).hexdigest().upper()
- def send_plc_update(device_name, item, old_value, new_value, command_type):
- """
- 向PLC发送参数更新指令
-
- 参数:
- device_name: 设备名称
- item: 参数项名称
- old_value: 旧值
- new_value: 新值
- command_type: 指令类型
-
- 返回:
- 是否发送成功
- """
- # 构造签名和请求数据
- timestamp = int(time.time()) # 生成时间戳
- record_obj = {
- "project_id": PROJECT_ID_FOR_CALLBACK, # 项目ID
- "item": item, # 参数项名称
- "old_value": old_value, # 旧值
- "new_value": new_value, # 新值
- "command_type": command_type # 指令类型
- }
- record_data = json.dumps([record_obj]) # 生成签名数据
- signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp) # 生成签名
- url = f"{PLC_URL}?sign={signature}×tamp={timestamp}" # 生成请求URL
- payload = [record_obj]
- logger.info(f"[{device_name}] PLC指令 {item} 从 {old_value} 到 {new_value}")
- logger.debug(f"[{device_name}] 签名数据 {record_data}")
- logger.debug(f"[{device_name}] 签名值 {signature}")
- # 重试机制
- max_retries, retry_interval = 3, 60 # 重试次数 重试间隔
- for attempt in range(1, max_retries + 1):
- try:
- logger.info(f"[{device_name}] 发送PLC指令 尝试 {attempt}/{max_retries}")
- response = requests.post(url, json=payload, timeout=15) # 发送PLC指令 请求头 请求体 超时时间
- response_json = response.json()
- if response_json.get('code') == 200:
- logger.info(f"[{device_name}] PLC指令发送成功 响应 {response_json}")
- return True
- else:
- logger.error(f"[{device_name}] PLC指令发送失败 {response_json.get('msg', '未知错误')}")
- except requests.exceptions.RequestException as e:
- logger.error(f"[{device_name}] PLC指令网络错误 {e}")
- except Exception as e:
- logger.error(f"[{device_name}] PLC指令未知错误 {e}")
- if attempt < max_retries: # 重试次数 小于 最大重试次数
- logger.info(f"[{device_name}] 等待{retry_interval}秒后重试")
- time.sleep(retry_interval)
- logger.error(f"[{device_name}] PLC指令发送失败,已达最大重试次数")
- return False
- def send_decision_to_callback(type_name, **kwargs):
- """
- 发送决策结果到回调接口
-
- 参数:
- type_name: 设备类型名称
- **kwargs: 决策结果数据
-
- 返回:
- 是否发送成功
- """
- payload = {"list": [{"type": type_name, "project_id": PROJECT_ID_FOR_CALLBACK, **kwargs}]} # 请求负载 设备类型 项目ID 决策结果数据
- logger.info(f"[{type_name}] 发送决策数据\n{json.dumps(payload, indent=2, ensure_ascii=False)}")
- max_retries, retry_interval = 3, 60 # 重试次数 重试间隔
- for attempt in range(1, max_retries + 1):
- try:
- logger.info(f"[{type_name}] 发送回调 尝试 {attempt}/{max_retries}")
- response = requests.post(CALLBACK_URL, headers=HEADERS, json=payload, timeout=15) # 发送回调 请求头 请求体 超时时间
- response.raise_for_status()
- logger.info(f"[{type_name}] 回调发送成功 响应 {response.text}")
- return True
- except requests.exceptions.RequestException as e:
- logger.error(f"[{type_name}] 回调发送失败 {e}")
- if attempt < max_retries: # 重试次数 小于 最大重试次数
- logger.info(f"[{type_name}] 等待{retry_interval}秒后重试")
- time.sleep(retry_interval)
- logger.error(f"[{type_name}] 回调发送失败,已达最大重试次数")
- return False
- def get_device_value(payload, device_name):
- """
- 从API获取设备数据项的当前值
-
- 参数:
- payload: 请求负载
- device_name: 设备名称
-
- 返回:
- 数据值或None
- """
- try:
- response = requests.post(API_URL, headers=HEADERS, json=[payload], timeout=10) # 发送请求 请求头 请求体 超时时间
- response.raise_for_status()
- api_response = response.json() # 解析响应
- if api_response.get("code") == 200 and api_response.get("data"):
- val_str = api_response["data"][0].get("val") # 获取数据值
- if val_str is not None:
- return float(val_str)
- else:
- logger.error(f"[{device_name}] 获取数据失败 {payload['deviceItems']} {api_response.get('msg', '未知错误')}") # 日志 设备名称 请求负载 响应
- except requests.exceptions.RequestException as e:
- logger.error(f"[{device_name}] API网络错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误
- except (json.JSONDecodeError, ValueError, IndexError) as e:
- logger.error(f"[{device_name}] API数据解析错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误
- return None
- # 设备监控主循环
- def monitor_device(device):
- """
- 单个设备的监控循环
-
- 完整流程:
- 1. 等待触发条件
- 2. 收集稳定数据
- 3. 执行决策计算
- 4. 发送控制指令
- 5. 等待重置信号
-
- 参数:
- device: 设备配置字典
- """
- name = device["name"]
- threading.current_thread().name = name
- logger.info("监控线程启动")
- # 加载设备历史状态
- device_state = device_states.get(name, {}) # 设备状态
- model_prev_L_s = device_state.get('model_prev_L_s') # 过滤时间 上一轮
- model_prev_t_bw_s = device_state.get('model_prev_t_bw_s') # 反洗时间 上一轮
- last_cycle_end_time_str = device_state.get('last_cycle_end_time') # 上次运行结束时间
- # 解析上次运行结束时间
- last_cycle_end_time = None # 上次运行结束时间
- if last_cycle_end_time_str:
- try:
- last_cycle_end_time = datetime.strptime(last_cycle_end_time_str, DATETIME_FORMAT) # 上次运行结束时间
- logger.info(f"历史状态加载成功,上次运行时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
- except ValueError:
- logger.warning(f"时间戳解析失败 {last_cycle_end_time_str}")
- else:
- logger.info("首次运行,无历史状态")
- # 主循环
- while True:
- try:
- # 阶段1: 等待触发条件
- logger.info(f"等待触发 控制字需等于 {TRIGGER_VALUE}")
- while True:
- control_value = get_device_value(device["control_payload"], name) # 控制字
- if control_value is not None and int(control_value) == TRIGGER_VALUE: # 控制字 等于 触发值
- logger.info("触发条件满足")
- break
- time.sleep(POLL_INTERVAL)
- # 阶段2: 收集数据
- logger.info(f"开始收集TMP数据 需要 {NUM_VALUES_TO_COLLECT} 个有效数据点")
- collected_values = []
- last_known_value = get_device_value(device["target_payload"], name) # 上次已知值
-
- if last_known_value is not None:
- logger.info(f"TMP基准值 {last_known_value}")
- # 循环收集数据点,直到达到目标数量
- while len(collected_values) < NUM_VALUES_TO_COLLECT: # 收集数据点 直到达到目标数量
- current_value = get_device_value(device["target_payload"], name) # 当前值
- if current_value is None:
- time.sleep(POLL_INTERVAL)
- continue
- # 只有当数值发生变化时才记录
- if current_value != last_known_value: # 当前值 不等于 上次已知值
- collected_values.append(current_value)
- logger.info(f"TMP变化 {last_known_value:.4f} 到 {current_value:.4f} 已收集 {len(collected_values)}/{NUM_VALUES_TO_COLLECT}")
- last_known_value = current_value
- time.sleep(POLL_INTERVAL)
- else:
- logger.warning("无法获取TMP基准值,跳过本轮")
- continue
- # 阶段3: 决策计算
- logger.info("数据收集完成,开始决策计算")
- if collected_values:
- # 计算中位数作为代表值
- median_value = statistics.median(sorted(collected_values))
- logger.info(f"TMP中位数 {median_value:.4f}")
- # 确定历史数据查询时间范围
- current_decision_time = datetime.now()
- start_query_time = last_cycle_end_time if last_cycle_end_time else current_decision_time - timedelta(hours=48)
- _word_controldevice = device["control_payload"]["deviceItems"]
- # 查询历史极值
- max_tmp, min_tmp = get_tmp_extremes(device["press_pv_item"], start_query_time, current_decision_time, _word_controldevice)
- # 调用DQN模型获取决策建议
- logger.info("调用DQN决策模型")
- uf_bw_dict = run_uf_DQN_decide(uf_params, median_value)
- logger.info(f"模型决策结果 {uf_bw_dict}")
- # 获取当前PLC参数
- prod_time = get_device_value(device["production_time_payload"], name) or 3800 # 产水时间 默认3800
- bw_time = get_device_value(device["backwashing_payload"], name) or 100 # 反洗时间 默认100
- # 生成PLC指令
- L_s, t_bw_s = generate_plc_instructions(
- prod_time, bw_time, # 产水时间 反洗时间
- model_prev_L_s, model_prev_t_bw_s, # 过滤时间 反洗时间 上一轮
- uf_bw_dict["L_s"], uf_bw_dict["t_bw_s"] # 过滤时间 反洗时间 决策建议
- )
-
- # 计算运行指标
- logger.info(f"计算运行指标 TMP={median_value} L_s={L_s} t_bw_s={t_bw_s}")
- metrics = calc_uf_cycle_metrics(uf_params, median_value, max_tmp, min_tmp, L_s, t_bw_s) # 计算运行指标
- # 发送决策结果
- send_decision_to_callback(
- type_name=name, # 设备名称
- water_production_time=int(L_s), # 过滤时间
- physical_backwash=int(t_bw_s), # 反洗时间
- ceb_backwash_frequency=int(metrics["k_bw_per_ceb"]), # 化学反洗频率
- duration_system=int(prod_time), # 系统运行时间
- tmp_action=median_value, # TMP动作
- recovery_rate=metrics["recovery"], # 回收率
- ton_water_energy_kWh=metrics['ton_water_energy_kWh_per_m3'], # 吨水电耗
- max_permeability=metrics['max_permeability'], # 最高渗透率
- daily_prod_time_h=metrics['daily_prod_time_h'], # 日均产水时间
- ctime=current_decision_time.strftime(DATETIME_FORMAT) # 时间
- )
- # 判断是否下发PLC指令
- if get_current_config()['system']['use_model'] == 1:
- logger.info("模型开关已开启,下发PLC指令")
- send_plc_update(name, device["production_time_payload"]["deviceItems"], str(prod_time), str(int(L_s)), 1) # 过滤时间
- send_plc_update(name, device["backwashing_payload"]["deviceItems"], str(bw_time), str(int(t_bw_s)), 2) # 反洗时间
- else:
- logger.info("模型开关已关闭,跳过PLC指令")
- # 保存运行状态
- model_prev_L_s = L_s # 过滤时间 上一轮
- model_prev_t_bw_s = t_bw_s # 反洗时间 上一轮
- last_cycle_end_time = current_decision_time # 上次运行结束时间
- state_to_save = {
- 'model_prev_L_s': model_prev_L_s, # 过滤时间 上一轮
- 'model_prev_t_bw_s': model_prev_t_bw_s, # 反洗时间 上一轮
- 'last_cycle_end_time': last_cycle_end_time.strftime(DATETIME_FORMAT) # 上次运行结束时间
- }
- save_device_state(name, state_to_save) # 保存设备状态
- logger.info(f"状态保存完成 下次查询起始时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
- # 阶段4: 等待重置
- logger.info(f"等待重置 控制字需不等于 {TRIGGER_VALUE}")
- while True:
- control_value = get_device_value(device["control_payload"], name) # 控制字
- if control_value is None or int(control_value) != TRIGGER_VALUE: # 控制字 不等于 触发值
- logger.info("重置条件满足,开始新一轮")
- break
- time.sleep(POLL_INTERVAL)
- logger.info(f"{name} 本轮完成\n")
- except Exception as e:
- logger.critical(f"监控循环异常 {e}", exc_info=True)
- logger.info("等待60秒后重试")
- time.sleep(60)
- # 程序主入口
- def main():
- """
- 主函数
-
- 功能:
- 1. 加载设备历史状态
- 2. 为每个设备启动独立监控线程
- 3. 保持主线程运行
- """
- logger.info("========================================")
- logger.info("超滤并行监控服务启动")
- logger.info("========================================")
- # 加载设备历史状态
- load_device_states()
- # 为每个设备创建监控线程
- threads = []
- for device_config in DEVICE_SEQUENCE:
- thread = threading.Thread(target=monitor_device, args=(device_config,), daemon=True)
- threads.append(thread)
- thread.start()
- logger.info(f"设备 {device_config['name']} 监控线程已启动")
- # 保持主线程运行
- try:
- while any(t.is_alive() for t in threads):
- time.sleep(1)
- except KeyboardInterrupt:
- logger.info("检测到中断信号,程序退出")
- if __name__ == "__main__":
- main()
|