|
|
@@ -0,0 +1,810 @@
|
|
|
+# 标准库导入
|
|
|
+import time
|
|
|
+import json
|
|
|
+import os
|
|
|
+import threading
|
|
|
+import hashlib
|
|
|
+from datetime import datetime, timedelta
|
|
|
+import logging
|
|
|
+from logging.handlers import RotatingFileHandler
|
|
|
+
|
|
|
+# 第三方库导入
|
|
|
+import pymysql
|
|
|
+import requests
|
|
|
+
|
|
|
+# 自定义模块导入
|
|
|
+from DQN_env import UFParams
|
|
|
+from DQN_decide import run_uf_DQN_decide, generate_plc_instructions, calc_uf_cycle_metrics
|
|
|
+
|
|
|
+# 日志系统配置
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+logger.setLevel(logging.INFO)
|
|
|
+
|
|
|
+# 日志输出格式
|
|
|
+formatter = logging.Formatter(
|
|
|
+ '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
|
|
|
+ datefmt='%Y-%m-%d %H:%M:%S'
|
|
|
+)
|
|
|
+
|
|
|
+# 文件日志处理器,单个文件最大5MB,保留3个备份
|
|
|
+file_handler = RotatingFileHandler('monitor_service.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
|
|
|
+file_handler.setFormatter(formatter)
|
|
|
+
|
|
|
+# 控制台日志处理器
|
|
|
+console_handler = logging.StreamHandler()
|
|
|
+console_handler.setFormatter(formatter)
|
|
|
+
|
|
|
+# 添加处理器
|
|
|
+logger.addHandler(file_handler)
|
|
|
+logger.addHandler(console_handler)
|
|
|
+
|
|
|
+
|
|
|
+# 配置加载函数
|
|
|
+def load_config(config_file='config.json'):
|
|
|
+ """
|
|
|
+ 从JSON配置文件加载系统配置
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ config_file: 配置文件路径
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 配置字典
|
|
|
+
|
|
|
+ 异常:
|
|
|
+ 配置文件不存在或格式错误时抛出异常
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ with open(config_file, 'r', encoding='utf-8') as f:
|
|
|
+ return json.load(f)
|
|
|
+ except FileNotFoundError:
|
|
|
+ logger.critical(f"配置文件未找到 {config_file}")
|
|
|
+ raise
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ logger.critical(f"配置文件格式错误 {config_file}: {e}")
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
+def get_current_config():
|
|
|
+ """
|
|
|
+ 获取当前配置,支持运行时配置动态变更
|
|
|
+ """
|
|
|
+ return load_config()
|
|
|
+
|
|
|
+
|
|
|
+# 初始化配置
|
|
|
+config = load_config()
|
|
|
+
|
|
|
+# 全局配置参数
|
|
|
+
|
|
|
+# API接口配置
|
|
|
+API_BASE_URL = config['api']['base_url']
|
|
|
+API_URL = API_BASE_URL + config['api']['current_data_endpoint']
|
|
|
+CALLBACK_URL = API_BASE_URL + config['api']['callback_endpoint']
|
|
|
+PLC_URL = API_BASE_URL + config['api']['plc_endpoint']
|
|
|
+
|
|
|
+# HTTP请求头
|
|
|
+HEADERS = {
|
|
|
+ "Content-Type": "application/json",
|
|
|
+ "JWT-TOKEN": config['api']['jwt_token']
|
|
|
+}
|
|
|
+
|
|
|
+# MySQL数据库配置,优先读取环境变量
|
|
|
+DB_USER = os.getenv('DB_USERNAME', config['database']['user'])
|
|
|
+DB_PASSWORD = os.getenv('DB_PASSWORD', config['database']['password'])
|
|
|
+DB_HOST = os.getenv('DB_HOST', config['database']['host'])
|
|
|
+DB_NAME = os.getenv('DB_DATABASE', config['database']['database'])
|
|
|
+DB_PORT = int(os.getenv('DB_PORT', str(config['database']['port'])))
|
|
|
+HISTORY_TABLE_NAME = config['database']['table_name']
|
|
|
+
|
|
|
+# 超滤系统参数
|
|
|
+uf_params = UFParams()
|
|
|
+PROJECT_ID_FOR_CALLBACK = config['scada']['project_id']
|
|
|
+SCADA_SECRET = config['scada']['secret']
|
|
|
+
|
|
|
+# 监控流程参数
|
|
|
+TRIGGER_VALUE = config['system']['trigger_value']
|
|
|
+NUM_VALUES_TO_COLLECT = config['system']['num_values_to_collect']
|
|
|
+POLL_INTERVAL = config['system']['poll_interval']
|
|
|
+
|
|
|
+# 设备列表
|
|
|
+DEVICE_SEQUENCE = config['devices']
|
|
|
+
|
|
|
+# 状态持久化配置
|
|
|
+STATE_FILE = 'device_states.json'
|
|
|
+_state_lock = threading.Lock()
|
|
|
+device_states = {}
|
|
|
+DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
|
|
|
+
|
|
|
+
|
|
|
+# 状态持久化函数
|
|
|
+def load_device_states():
|
|
|
+ """
|
|
|
+ 从状态文件加载所有设备的运行状态
|
|
|
+ """
|
|
|
+ global device_states
|
|
|
+ with _state_lock:
|
|
|
+ try:
|
|
|
+ if os.path.exists(STATE_FILE):
|
|
|
+ with open(STATE_FILE, 'r', encoding='utf-8') as f:
|
|
|
+ content = f.read()
|
|
|
+ if content:
|
|
|
+ device_states = json.loads(content)
|
|
|
+ logger.info(f"状态文件加载成功 {STATE_FILE}")
|
|
|
+ else:
|
|
|
+ logger.warning(f"状态文件为空 {STATE_FILE}")
|
|
|
+ device_states = {}
|
|
|
+ else:
|
|
|
+ logger.info(f"状态文件不存在,首次运行 {STATE_FILE}")
|
|
|
+ device_states = {}
|
|
|
+ except (json.JSONDecodeError, IOError) as e:
|
|
|
+ logger.error(f"状态文件加载失败 {STATE_FILE}: {e}")
|
|
|
+ device_states = {}
|
|
|
+
|
|
|
+
|
|
|
+def save_device_state(device_name, state_data):
|
|
|
+ """
|
|
|
+ 保存单个设备的运行状态到文件
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ device_name: 设备名称
|
|
|
+ state_data: 设备状态数据字典
|
|
|
+ """
|
|
|
+ with _state_lock:
|
|
|
+ try:
|
|
|
+ # 读取现有状态
|
|
|
+ full_states = {}
|
|
|
+ if os.path.exists(STATE_FILE):
|
|
|
+ with open(STATE_FILE, 'r', encoding='utf-8') as f:
|
|
|
+ content = f.read()
|
|
|
+ if content:
|
|
|
+ full_states = json.loads(content)
|
|
|
+
|
|
|
+ # 更新指定设备状态
|
|
|
+ full_states[device_name] = state_data
|
|
|
+
|
|
|
+ # 写回文件
|
|
|
+ with open(STATE_FILE, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(full_states, f, indent=4, ensure_ascii=False)
|
|
|
+
|
|
|
+ # 更新内存缓存
|
|
|
+ global device_states
|
|
|
+ device_states[device_name] = state_data
|
|
|
+ logger.info(f"[{device_name}] 状态保存成功")
|
|
|
+ except (json.JSONDecodeError, IOError) as e:
|
|
|
+ logger.error(f"[{device_name}] 状态保存失败: {e}")
|
|
|
+
|
|
|
+
|
|
|
+# 核心业务函数
|
|
|
+
|
|
|
+def create_db_connection():
|
|
|
+ """
|
|
|
+ 创建MySQL数据库连接
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 连接对象或None
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ connection = pymysql.connect(
|
|
|
+ host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME,
|
|
|
+ port=DB_PORT, charset='utf8mb4',
|
|
|
+ cursorclass=pymysql.cursors.DictCursor
|
|
|
+ )
|
|
|
+ logger.debug("数据库连接成功")
|
|
|
+ return connection
|
|
|
+ except pymysql.MySQLError as e:
|
|
|
+ logger.error(f"数据库连接失败: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def get_tmp_extremes(item_name, start_time, end_time, word_control):
|
|
|
+ """
|
|
|
+ 通过API查询历史数据中指定时间范围内的跨膜压差极值
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ item_name: 数据项名称
|
|
|
+ start_time: 开始时间
|
|
|
+ end_time: 结束时间
|
|
|
+ word_control: 控制字段名
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ (最大值, 最小值) 或 (None, None)
|
|
|
+ """
|
|
|
+ # 转换时间为毫秒级时间戳
|
|
|
+ start_timestamp = int(start_time.timestamp() * 1000)
|
|
|
+ end_timestamp = int(end_time.timestamp() * 1000)
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ f"查询历史极值 {item_name} 从 {start_time.strftime(DATETIME_FORMAT)} 到 {end_time.strftime(DATETIME_FORMAT)}")
|
|
|
+
|
|
|
+ # API基础URL
|
|
|
+ api_base_url = "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data"
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 第一次调用:查询item_name的极值
|
|
|
+ params1 = {
|
|
|
+ "deviceid": "1",
|
|
|
+ "dataitemid": item_name,
|
|
|
+ "project_id": "92",
|
|
|
+ "stime": start_timestamp,
|
|
|
+ "etime": end_timestamp,
|
|
|
+ "size": "1",
|
|
|
+ "interval": "minute",
|
|
|
+ "aggregator": "new"
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info(f"第一次API调用: {api_base_url} 参数: {params1}")
|
|
|
+ response1 = requests.get(api_base_url, params=params1, headers=HEADERS, timeout=30)
|
|
|
+ response1.raise_for_status()
|
|
|
+ data1 = response1.json()
|
|
|
+ logger.debug(f"第一次API响应: {data1}")
|
|
|
+
|
|
|
+ # 第二次调用:查询word_control的极值
|
|
|
+ params2 = {
|
|
|
+ "deviceid": "1",
|
|
|
+ "dataitemid": word_control,
|
|
|
+ "project_id": "92",
|
|
|
+ "stime": start_timestamp,
|
|
|
+ "etime": end_timestamp,
|
|
|
+ "size": "1",
|
|
|
+ "interval": "minute",
|
|
|
+ "aggregator": "new"
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info(f"第二次API调用: {api_base_url} 参数: {params2}")
|
|
|
+ response2 = requests.get(api_base_url, params=params2, headers=HEADERS, timeout=30)
|
|
|
+ response2.raise_for_status()
|
|
|
+ data2 = response2.json()
|
|
|
+ logger.debug(f"第二次API响应: {data2}")
|
|
|
+
|
|
|
+ # 处理两次API调用的结果
|
|
|
+ max_val = None
|
|
|
+ min_val = None
|
|
|
+
|
|
|
+ # 从第一次调用结果中提取'跨膜压差'的值,并存储在字典中,以时间为键
|
|
|
+ item_values = {}
|
|
|
+ if data1.get("code") == 200 and data1.get("data"):
|
|
|
+ for item in data1["data"]:
|
|
|
+ if item.get("name") == item_name and item.get("val") is not None:
|
|
|
+ time = item.get("htime_at")
|
|
|
+ item_values[time] = float(item.get("val"))
|
|
|
+ if item_values:
|
|
|
+ logger.info(f"第一次API查询成功,提取到跨膜压差数据数量:{len(item_values)}")
|
|
|
+
|
|
|
+ # 从第二次调用结果中提取'UF1控制字'为26的数据点,并进行时间匹配
|
|
|
+ if data2.get("code") == 200 and data2.get("data"):
|
|
|
+ control_26_values = []
|
|
|
+ for item in data2["data"]:
|
|
|
+ if item.get("name") == word_control and item.get("val") == '26':
|
|
|
+ time = item.get("htime_at")
|
|
|
+ # 如果在第一次数据中找到了对应的跨膜压差值
|
|
|
+ if time in item_values:
|
|
|
+ control_26_values.append(item_values[time])
|
|
|
+
|
|
|
+ if control_26_values:
|
|
|
+ logger.info(f"找到控制字为26的数据点,合并跨膜压差数据")
|
|
|
+ max_val = max(control_26_values)
|
|
|
+ min_val = min(control_26_values)
|
|
|
+ # 增加最小跨膜压差的下限值
|
|
|
+ if min_val < 0.01:
|
|
|
+ min_val = 0.01
|
|
|
+ logger.info(f"控制字为26时的最大跨膜压差值={max_val},最小跨膜压差值={min_val}")
|
|
|
+
|
|
|
+ if max_val is not None and min_val is not None:
|
|
|
+ logger.info(f"API查询成功 最大跨膜压差值={max_val} 最小跨膜压差值={min_val}")
|
|
|
+ return max_val, min_val
|
|
|
+ else:
|
|
|
+ logger.warning("未找到有效的控制字为26时的跨膜压差数据")
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ logger.error(f"API请求错误: {e}")
|
|
|
+ return None, None
|
|
|
+ except (json.JSONDecodeError, ValueError, KeyError) as e:
|
|
|
+ logger.error(f"API响应解析错误: {e}")
|
|
|
+ return None, None
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"API查询未知错误: {e}")
|
|
|
+ return None, None
|
|
|
+
|
|
|
+
|
|
|
+def generate_md5_signature(record_data, secret, timestamp):
|
|
|
+ """
|
|
|
+ 生成PLC请求的MD5签名
|
|
|
+ """
|
|
|
+ cal_str = f"{record_data}{secret}{timestamp}"
|
|
|
+ return hashlib.md5(cal_str.encode('utf-8')).hexdigest().upper()
|
|
|
+
|
|
|
+
|
|
|
+def send_plc_update(device_name, item, old_value, new_value, command_type):
|
|
|
+ """
|
|
|
+ 向PLC发送参数更新指令
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ device_name: 设备名称
|
|
|
+ item: 参数项名称
|
|
|
+ old_value: 旧值
|
|
|
+ new_value: 新值
|
|
|
+ command_type: 指令类型
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 是否发送成功
|
|
|
+ """
|
|
|
+ # 构造签名和请求数据
|
|
|
+ timestamp = int(time.time()) # 生成时间戳
|
|
|
+ record_obj = {
|
|
|
+ "project_id": PROJECT_ID_FOR_CALLBACK, # 项目ID
|
|
|
+ "item": item, # 参数项名称
|
|
|
+ "old_value": old_value, # 旧值
|
|
|
+ "new_value": new_value, # 新值
|
|
|
+ "command_type": command_type # 指令类型
|
|
|
+ }
|
|
|
+ record_data = json.dumps([record_obj]) # 生成签名数据
|
|
|
+ signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp) # 生成签名
|
|
|
+ url = f"{PLC_URL}?sign={signature}×tamp={timestamp}" # 生成请求URL
|
|
|
+ payload = [record_obj]
|
|
|
+
|
|
|
+ logger.info(f"[{device_name}] PLC指令 {item} 从 {old_value} 到 {new_value}")
|
|
|
+ logger.debug(f"[{device_name}] 签名数据 {record_data}")
|
|
|
+ logger.debug(f"[{device_name}] 签名值 {signature}")
|
|
|
+
|
|
|
+ # 重试机制
|
|
|
+ max_retries, retry_interval = 3, 60 # 重试次数 重试间隔
|
|
|
+ for attempt in range(1, max_retries + 1):
|
|
|
+ try:
|
|
|
+ logger.info(f"[{device_name}] 发送PLC指令 尝试 {attempt}/{max_retries}")
|
|
|
+ response = requests.post(url, json=payload, timeout=15) # 发送PLC指令 请求头 请求体 超时时间
|
|
|
+ response_json = response.json()
|
|
|
+ if response_json.get('code') == 200:
|
|
|
+ logger.info(f"[{device_name}] PLC指令发送成功 响应 {response_json}")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ logger.error(f"[{device_name}] PLC指令发送失败 {response_json.get('msg', '未知错误')}")
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ logger.error(f"[{device_name}] PLC指令网络错误 {e}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"[{device_name}] PLC指令未知错误 {e}")
|
|
|
+
|
|
|
+ if attempt < max_retries: # 重试次数 小于 最大重试次数
|
|
|
+ logger.info(f"[{device_name}] 等待{retry_interval}秒后重试")
|
|
|
+ time.sleep(retry_interval)
|
|
|
+
|
|
|
+ logger.error(f"[{device_name}] PLC指令发送失败,已达最大重试次数")
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def send_decision_to_callback(type_name, **kwargs):
|
|
|
+ """
|
|
|
+ 发送决策结果到回调接口
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ type_name: 设备类型名称
|
|
|
+ **kwargs: 决策结果数据
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ use_model状态值: 1表示开启模型,0表示关闭模型,None表示发送失败
|
|
|
+ """
|
|
|
+ payload = {"list": [{"type": type_name, "project_id": PROJECT_ID_FOR_CALLBACK, **kwargs}]} # 请求负载 设备类型 项目ID 决策结果数据
|
|
|
+
|
|
|
+ logger.info(f"[{type_name}] 发送决策数据\n{json.dumps(payload, indent=2, ensure_ascii=False)}")
|
|
|
+
|
|
|
+ max_retries, retry_interval = 3, 60 # 重试次数 重试间隔
|
|
|
+ for attempt in range(1, max_retries + 1):
|
|
|
+ try:
|
|
|
+ logger.info(f"[{type_name}] 发送回调 尝试 {attempt}/{max_retries}")
|
|
|
+ response = requests.post(CALLBACK_URL, headers=HEADERS, json=payload, timeout=15) # 发送回调 请求头 请求体 超时时间
|
|
|
+ response.raise_for_status()
|
|
|
+ response_json = response.json()
|
|
|
+ logger.info(f"[{type_name}] 回调发送成功 响应 {response.text}")
|
|
|
+
|
|
|
+ # 提取返回的 data 字段,表示 use_model 状态(1=开启,0=关闭)
|
|
|
+ use_model_status = response_json.get('data')
|
|
|
+ logger.info(f"[{type_name}] 服务器返回 use_model 状态: {use_model_status}")
|
|
|
+ return use_model_status
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ logger.error(f"[{type_name}] 回调发送失败 {e}")
|
|
|
+ except (json.JSONDecodeError, ValueError) as e:
|
|
|
+ logger.error(f"[{type_name}] 响应解析失败 {e}")
|
|
|
+
|
|
|
+ if attempt < max_retries: # 重试次数 小于 最大重试次数
|
|
|
+ logger.info(f"[{type_name}] 等待{retry_interval}秒后重试")
|
|
|
+ time.sleep(retry_interval)
|
|
|
+
|
|
|
+ logger.error(f"[{type_name}] 回调发送失败,已达最大重试次数")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def get_device_value(payload, device_name):
|
|
|
+ """
|
|
|
+ 从API获取设备数据项的当前值
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ payload: 请求负载
|
|
|
+ device_name: 设备名称
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ 数据值或None
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ response = requests.post(API_URL, headers=HEADERS, json=[payload], timeout=10) # 发送请求 请求头 请求体 超时时间
|
|
|
+ response.raise_for_status()
|
|
|
+ api_response = response.json() # 解析响应
|
|
|
+ if api_response.get("code") == 200 and api_response.get("data"):
|
|
|
+ val_str = api_response["data"][0].get("val") # 获取数据值
|
|
|
+ if val_str is not None:
|
|
|
+ return float(val_str)
|
|
|
+ else:
|
|
|
+ logger.error(
|
|
|
+ f"[{device_name}] 获取数据失败 {payload['deviceItems']} {api_response.get('msg', '未知错误')}") # 日志 设备名称 请求负载 响应
|
|
|
+ except requests.exceptions.RequestException as e:
|
|
|
+ logger.error(f"[{device_name}] API网络错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误
|
|
|
+ except (json.JSONDecodeError, ValueError, IndexError) as e:
|
|
|
+ logger.error(f"[{device_name}] API数据解析错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+# 设备监控主循环
|
|
|
+
|
|
|
+def monitor_device(device):
|
|
|
+ """
|
|
|
+ 单个设备的监控循环
|
|
|
+
|
|
|
+ 完整流程:
|
|
|
+ 1. 等待触发条件
|
|
|
+ 2. 收集稳定数据
|
|
|
+ 3. 执行决策计算
|
|
|
+ 4. 发送控制指令
|
|
|
+ 5. 等待重置信号
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ device: 设备配置字典
|
|
|
+ """
|
|
|
+ name = device["name"]
|
|
|
+ threading.current_thread().name = name
|
|
|
+ logger.info("监控线程启动")
|
|
|
+
|
|
|
+ # 加载设备历史状态
|
|
|
+ device_state = device_states.get(name, {}) # 设备状态
|
|
|
+ model_prev_L_s = device_state.get('model_prev_L_s') # 过滤时间 上一轮
|
|
|
+ model_prev_t_bw_s = device_state.get('model_prev_t_bw_s') # 反洗时间 上一轮
|
|
|
+ last_cycle_end_time_str = device_state.get('last_cycle_end_time') # 上次运行结束时间
|
|
|
+
|
|
|
+ # 解析上次运行结束时间
|
|
|
+ last_cycle_end_time = None # 上次运行结束时间
|
|
|
+ if last_cycle_end_time_str:
|
|
|
+ try:
|
|
|
+ last_cycle_end_time = datetime.strptime(last_cycle_end_time_str, DATETIME_FORMAT) # 上次运行结束时间
|
|
|
+ logger.info(f"历史状态加载成功,上次运行时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
|
|
|
+ except ValueError:
|
|
|
+ logger.warning(f"时间戳解析失败 {last_cycle_end_time_str}")
|
|
|
+ else:
|
|
|
+ logger.info("首次运行,无历史状态")
|
|
|
+
|
|
|
+ # 主循环
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ # 阶段1: 等待触发条件 (控制字=95)
|
|
|
+ logger.info(f"等待触发 控制字需等于 {TRIGGER_VALUE}")
|
|
|
+ while True:
|
|
|
+ control_value = get_device_value(device["control_payload"], name) # 控制字
|
|
|
+ if control_value is not None and int(control_value) == TRIGGER_VALUE: # 控制字 等于 触发值 95
|
|
|
+ logger.info("触发条件满足,开始等待控制字变为26")
|
|
|
+ break
|
|
|
+ time.sleep(POLL_INTERVAL)
|
|
|
+
|
|
|
+ # 阶段1.5: 等待控制字变为26
|
|
|
+ logger.info("等待控制字变为26")
|
|
|
+ while True:
|
|
|
+ control_value = get_device_value(device["control_payload"], name) # 控制字
|
|
|
+ if control_value is not None and int(control_value) == 26: # 控制字 等于 26
|
|
|
+ logger.info("控制字变为26,开始收集数据(控制字在22-26范围内有效)")
|
|
|
+ break
|
|
|
+ time.sleep(POLL_INTERVAL)
|
|
|
+
|
|
|
+ # 阶段2: 收集数据(控制字在22-26范围内视为有效,初始10分钟,未收集到有效数据时延长到30分钟)
|
|
|
+ logger.info("开始收集TMP数据(控制字在22-26范围内有效)")
|
|
|
+ collected_values = []
|
|
|
+ start_collection_time = datetime.now()
|
|
|
+ initial_duration = timedelta(minutes=10) # 初始10分钟
|
|
|
+ extended_duration = timedelta(minutes=30) # 延长到30分钟
|
|
|
+ collection_duration = initial_duration
|
|
|
+ has_valid_data = False # 标记是否收集到22-26范围内的有效数据
|
|
|
+
|
|
|
+ # 日志计数器,每收集60个点打印一次,避免日志过多
|
|
|
+ log_interval = 60
|
|
|
+
|
|
|
+ while datetime.now() - start_collection_time < collection_duration:
|
|
|
+ current_value = get_device_value(device["target_payload"], name) # 当前值
|
|
|
+ control_value = get_device_value(device["control_payload"], name) # 检查控制字
|
|
|
+
|
|
|
+ if control_value is not None:
|
|
|
+ control_int = int(control_value)
|
|
|
+
|
|
|
+ # 如果控制字变为95,说明系统重置了,需要重新开始
|
|
|
+ if control_int == TRIGGER_VALUE:
|
|
|
+ logger.info("控制字变为95,系统重置,重新开始监控")
|
|
|
+ break
|
|
|
+
|
|
|
+ # 如果控制字在22-26范围内,视为有效,收集数据
|
|
|
+ elif 22 <= control_int <= 26:
|
|
|
+ has_valid_data = True
|
|
|
+ if current_value is not None:
|
|
|
+ collected_values.append(current_value)
|
|
|
+ # 每收集60个点或第一个点时打印日志,减少日志数量
|
|
|
+ if len(collected_values) == 1 or len(collected_values) % log_interval == 0:
|
|
|
+ logger.info(
|
|
|
+ f"收集TMP值 {current_value:.4f} 控制字={control_int} 已收集 {len(collected_values)} 个数据点")
|
|
|
+
|
|
|
+ # 如果控制字不在22-26范围内,不停止,继续等待(可能后续会回到22-26范围)
|
|
|
+ # 不记录日志,避免日志过多
|
|
|
+
|
|
|
+ time.sleep(POLL_INTERVAL)
|
|
|
+
|
|
|
+ # 如果10分钟内没有收集到22-26范围内的数据,延长收集时间到30分钟
|
|
|
+ elapsed_time = datetime.now() - start_collection_time
|
|
|
+ if elapsed_time >= initial_duration and not has_valid_data and collection_duration == initial_duration:
|
|
|
+ logger.info("10分钟内未收集到22-26范围内的数据,延长收集时间到30分钟")
|
|
|
+ collection_duration = extended_duration
|
|
|
+
|
|
|
+ # 检查是否收集到了22-26范围内的有效数据
|
|
|
+ if not has_valid_data:
|
|
|
+ logger.warning("30分钟内未收集到22-26范围内的有效数据,跳过本轮")
|
|
|
+ # 检查控制字状态,如果已经是95则直接开始新一轮
|
|
|
+ control_value = get_device_value(device["control_payload"], name)
|
|
|
+ if control_value is not None and int(control_value) == TRIGGER_VALUE:
|
|
|
+ logger.info("控制字已经是95,直接开始新一轮")
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ # 等待控制字重置后再继续
|
|
|
+ logger.info("等待控制字重置...")
|
|
|
+ time.sleep(10) # 等待10秒
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 如果收集到了有效数据但数据为空
|
|
|
+ if not collected_values:
|
|
|
+ logger.warning("收集到有效控制字但未收集到TMP数据,跳过本轮")
|
|
|
+ control_value = get_device_value(device["control_payload"], name)
|
|
|
+ if control_value is not None and int(control_value) == TRIGGER_VALUE:
|
|
|
+ logger.info("控制字已经是95,直接开始新一轮")
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ logger.info("等待控制字重置...")
|
|
|
+ time.sleep(10)
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 阶段3: 决策计算
|
|
|
+ logger.info(f"数据收集完成,共收集 {len(collected_values)} 个数据点,开始决策计算")
|
|
|
+ if collected_values:
|
|
|
+ # 计算平均值作为代表值
|
|
|
+ average_value = sum(collected_values) / len(collected_values)
|
|
|
+ logger.info(f"TMP平均值 {average_value:.4f}")
|
|
|
+
|
|
|
+ # 确定历史数据查询时间范围
|
|
|
+ current_decision_time = datetime.now()
|
|
|
+ start_query_time = last_cycle_end_time if last_cycle_end_time else current_decision_time - timedelta(
|
|
|
+ hours=48)
|
|
|
+ _word_controldevice = device["control_payload"]["deviceItems"]
|
|
|
+
|
|
|
+ # 查询历史极值
|
|
|
+ max_tmp, min_tmp = get_tmp_extremes(device["press_pv_item"], start_query_time, current_decision_time,
|
|
|
+ _word_controldevice)
|
|
|
+
|
|
|
+ # 调用DQN模型获取决策建议
|
|
|
+ logger.info("调用DQN决策模型")
|
|
|
+ uf_bw_dict = run_uf_DQN_decide(uf_params, average_value)
|
|
|
+ logger.info(f"模型决策结果 {uf_bw_dict}")
|
|
|
+
|
|
|
+ # 获取当前PLC参数
|
|
|
+ prod_time = get_device_value(device["production_time_payload"], name) or 3800 # 产水时间 默认3800
|
|
|
+ bw_time = get_device_value(device["backwashing_payload"], name) or 100 # 反洗时间 默认100
|
|
|
+ bw_per_ceb = get_device_value(device["ceb_payload"], name) or 40 # CEB 次数时间 默认40
|
|
|
+
|
|
|
+ # 生成PLC指令
|
|
|
+ L_s, t_bw_s = generate_plc_instructions(
|
|
|
+ prod_time, bw_time, # 产水时间 反洗时间
|
|
|
+ model_prev_L_s, model_prev_t_bw_s, # 过滤时间 反洗时间 上一轮
|
|
|
+ uf_bw_dict["L_s"], uf_bw_dict["t_bw_s"] # 过滤时间 反洗时间 决策建议
|
|
|
+ )
|
|
|
+
|
|
|
+ # 计算运行指标
|
|
|
+ logger.info(f"计算运行指标 TMP={average_value} L_s={L_s} t_bw_s={t_bw_s}")
|
|
|
+ metrics = calc_uf_cycle_metrics(uf_params, average_value, max_tmp, min_tmp, L_s, t_bw_s) # 计算运行指标
|
|
|
+ ceb_backwash_frequency = int(metrics["k_bw_per_ceb"])
|
|
|
+
|
|
|
+ # 发送决策结果,并获取服务器返回的 use_model 状态
|
|
|
+ use_model_status = send_decision_to_callback(
|
|
|
+ type_name=name, # 设备名称
|
|
|
+ water_production_time=int(L_s), # 过滤时间
|
|
|
+ physical_backwash=int(t_bw_s), # 反洗时间
|
|
|
+ ceb_backwash_frequency=ceb_backwash_frequency, # 化学反洗频率
|
|
|
+ duration_system=int(prod_time), # 系统运行时间
|
|
|
+ tmp_action=average_value, # TMP动作
|
|
|
+ recovery_rate=metrics["recovery"], # 回收率
|
|
|
+ ton_water_energy_kWh=metrics['ton_water_energy_kWh_per_m3'], # 吨水电耗
|
|
|
+ max_permeability=metrics['max_permeability'], # 最高渗透率
|
|
|
+ daily_prod_time_h=metrics['daily_prod_time_h'], # 日均产水时间
|
|
|
+ ctime=current_decision_time.strftime(DATETIME_FORMAT) # 时间
|
|
|
+ )
|
|
|
+
|
|
|
+ # 判断是否下发PLC指令,根据服务器返回的 use_model 状态
|
|
|
+ if use_model_status == 1:
|
|
|
+ logger.info("模型开关已开启,检查PLC指令")
|
|
|
+
|
|
|
+ # 记录当前PLC值和模型决策值
|
|
|
+ current_plc_values = {
|
|
|
+ 'prod_time': int(prod_time),
|
|
|
+ 'bw_time': int(bw_time),
|
|
|
+ 'bw_per_ceb': int(bw_per_ceb)
|
|
|
+ }
|
|
|
+ model_decision_values = {
|
|
|
+ 'L_s': int(L_s),
|
|
|
+ 't_bw_s': int(t_bw_s),
|
|
|
+ 'ceb_frequency': int(ceb_backwash_frequency)
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ f"当前PLC值: 产水时间={current_plc_values['prod_time']}, 反洗时间={current_plc_values['bw_time']}, CEB次数={current_plc_values['bw_per_ceb']}")
|
|
|
+ logger.info(
|
|
|
+ f"模型决策值: L_s={model_decision_values['L_s']}, t_bw_s={model_decision_values['t_bw_s']}, ceb_frequency={model_decision_values['ceb_frequency']}")
|
|
|
+
|
|
|
+ # 检查每个参数是否需要下发指令
|
|
|
+
|
|
|
+ # 检查产水时间是否需要更新
|
|
|
+ if current_plc_values['prod_time'] != model_decision_values['L_s']:
|
|
|
+ logger.info(
|
|
|
+ f"产水时间需要更新: {current_plc_values['prod_time']} -> {model_decision_values['L_s']}")
|
|
|
+ send_plc_update(name, device["production_time_payload"]["deviceItems"], str(prod_time),
|
|
|
+ str(model_decision_values['L_s']), 1)
|
|
|
+ else:
|
|
|
+ logger.info(f"产水时间无需更新: {current_plc_values['prod_time']}")
|
|
|
+
|
|
|
+ # 检查反洗时间是否需要更新
|
|
|
+ if current_plc_values['bw_time'] != model_decision_values['t_bw_s']:
|
|
|
+ logger.info(
|
|
|
+ f"反洗时间需要更新: {current_plc_values['bw_time']} -> {model_decision_values['t_bw_s']}")
|
|
|
+ send_plc_update(name, device["backwashing_payload"]["deviceItems"], str(bw_time),
|
|
|
+ str(model_decision_values['t_bw_s']), 4)
|
|
|
+ else:
|
|
|
+ logger.info(f"反洗时间无需更新: {current_plc_values['bw_time']}")
|
|
|
+
|
|
|
+ # 检查CEB次数是否需要更新
|
|
|
+ if current_plc_values['bw_per_ceb'] != model_decision_values['ceb_frequency']:
|
|
|
+ logger.info(
|
|
|
+ f"CEB次数需要更新: {current_plc_values['bw_per_ceb']} -> {model_decision_values['ceb_frequency']}")
|
|
|
+ send_plc_update(name, device["ceb_payload"]["deviceItems"], str(bw_per_ceb),
|
|
|
+ str(model_decision_values['ceb_frequency']), 2)
|
|
|
+ else:
|
|
|
+ logger.info(f"CEB次数无需更新: {current_plc_values['bw_per_ceb']}")
|
|
|
+
|
|
|
+ elif use_model_status == 0:
|
|
|
+ logger.info("服务器返回 use_model=0,模型开关已关闭,跳过PLC指令")
|
|
|
+ else:
|
|
|
+ logger.warning("回调发送失败,无法获取 use_model 状态,跳过PLC指令")
|
|
|
+
|
|
|
+ # 保存运行状态
|
|
|
+ model_prev_L_s = L_s # 过滤时间 上一轮
|
|
|
+ model_prev_t_bw_s = t_bw_s # 反洗时间 上一轮
|
|
|
+ last_cycle_end_time = current_decision_time # 上次运行结束时间
|
|
|
+
|
|
|
+ # 获取配置的TMP历史记录数量
|
|
|
+ current_config = get_current_config()
|
|
|
+ tmp_history_count = current_config.get('system', {}).get('tmp_history_count', 5)
|
|
|
+
|
|
|
+ # 从最新的内存缓存中读取当前设备状态(确保获取最新的历史记录)
|
|
|
+ current_device_state = device_states.get(name, {})
|
|
|
+ recent_tmp_values = current_device_state.get('recent_tmp_values', [])
|
|
|
+ recent_tmp_values.append(round(average_value, 4))
|
|
|
+ # 只保留最近N次
|
|
|
+ recent_tmp_values = recent_tmp_values[-tmp_history_count:]
|
|
|
+
|
|
|
+ state_to_save = {
|
|
|
+ 'model_prev_L_s': model_prev_L_s, # 过滤时间 上一轮
|
|
|
+ 'model_prev_t_bw_s': model_prev_t_bw_s, # 反洗时间 上一轮
|
|
|
+ 'last_cycle_end_time': last_cycle_end_time.strftime(DATETIME_FORMAT), # 上次运行结束时间
|
|
|
+ 'recent_tmp_values': recent_tmp_values # 最近N次TMP平均值(新增)
|
|
|
+ }
|
|
|
+ save_device_state(name, state_to_save) # 保存设备状态
|
|
|
+ logger.info(f"状态保存完成 下次查询起始时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
|
|
|
+ logger.info(f"最近{tmp_history_count}次TMP记录: {recent_tmp_values}")
|
|
|
+
|
|
|
+ # 阶段4: 等待重置
|
|
|
+ logger.info(f"等待重置 控制字需重新等于 {TRIGGER_VALUE}")
|
|
|
+ # 等待一段时间,确保不是立即开始新一轮
|
|
|
+ time.sleep(5) # 等待5秒
|
|
|
+ while True:
|
|
|
+ control_value = get_device_value(device["control_payload"], name) # 控制字
|
|
|
+ if control_value is not None and int(control_value) == TRIGGER_VALUE: # 控制字 等于 触发值
|
|
|
+ logger.info("完整周期结束,开始新一轮")
|
|
|
+ break
|
|
|
+ time.sleep(POLL_INTERVAL)
|
|
|
+
|
|
|
+ logger.info(f"{name} 本轮完成\n")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.critical(f"监控循环异常 {e}", exc_info=True)
|
|
|
+ logger.info("等待60秒后重试")
|
|
|
+ time.sleep(60)
|
|
|
+
|
|
|
+
|
|
|
+# 程序主入口
|
|
|
+
|
|
|
+def main():
|
|
|
+ """
|
|
|
+ 主函数
|
|
|
+
|
|
|
+ 功能:
|
|
|
+ 1. 加载设备历史状态
|
|
|
+ 2. 为每个设备启动独立监控线程
|
|
|
+ 3. 保持主线程运行
|
|
|
+ """
|
|
|
+ logger.info("========================================")
|
|
|
+ logger.info("超滤并行监控服务启动")
|
|
|
+ logger.info("========================================")
|
|
|
+
|
|
|
+ # 加载设备历史状态
|
|
|
+ load_device_states()
|
|
|
+
|
|
|
+ # 为每个设备创建监控线程
|
|
|
+ threads = []
|
|
|
+ for device_config in DEVICE_SEQUENCE:
|
|
|
+ thread = threading.Thread(target=monitor_device, args=(device_config,), daemon=True)
|
|
|
+ threads.append(thread)
|
|
|
+ thread.start()
|
|
|
+ logger.info(f"设备 {device_config['name']} 监控线程已启动")
|
|
|
+
|
|
|
+ # 保持主线程运行
|
|
|
+ try:
|
|
|
+ while any(t.is_alive() for t in threads):
|
|
|
+ time.sleep(1)
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ logger.info("检测到中断信号,程序退出")
|
|
|
+
|
|
|
+
|
|
|
+def test_get_tmp_extremes():
|
|
|
+ """
|
|
|
+ 测试get_tmp_extremes函数的API调用
|
|
|
+ """
|
|
|
+ print("=" * 50)
|
|
|
+ print("测试get_tmp_extremes API调用")
|
|
|
+ print("=" * 50)
|
|
|
+
|
|
|
+ # 设置测试参数
|
|
|
+ test_item_name = "C.M.UF1_DB@press_PV" # 测试数据项
|
|
|
+ test_word_control = "C.M.UF1_DB@word_control" # 测试控制字段
|
|
|
+
|
|
|
+ # 设置测试时间范围(最近24小时)
|
|
|
+ end_time = datetime.now()
|
|
|
+ start_time = end_time - timedelta(hours=24)
|
|
|
+
|
|
|
+ print(f"测试参数:")
|
|
|
+ print(f" 数据项: {test_item_name}")
|
|
|
+ print(f" 控制字段: {test_word_control}")
|
|
|
+ print(f" 开始时间: {start_time.strftime(DATETIME_FORMAT)}")
|
|
|
+ print(f" 结束时间: {end_time.strftime(DATETIME_FORMAT)}")
|
|
|
+ print()
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 调用函数
|
|
|
+ max_val, min_val = get_tmp_extremes(test_item_name, start_time, end_time, test_word_control)
|
|
|
+
|
|
|
+ print("测试结果:")
|
|
|
+ if max_val is not None and min_val is not None:
|
|
|
+ print(f" API调用成功")
|
|
|
+ print(f" 最大值: {max_val}")
|
|
|
+ print(f" 最小值: {min_val}")
|
|
|
+ else:
|
|
|
+ print(f" API调用失败或未返回有效数据")
|
|
|
+ print(f" 最大值: {max_val}")
|
|
|
+ print(f" 最小值: {min_val}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 测试过程中发生异常: {e}")
|
|
|
+
|
|
|
+ print("=" * 50)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ # 运行测试用例
|
|
|
+ # test_get_tmp_extremes()
|
|
|
+
|
|
|
+ # 运行主程序
|
|
|
+ main()
|