| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797 |
- # 标准库导入
- import time
- import json
- import os
- import threading
- import hashlib
- from datetime import datetime, timedelta
- import logging
- from logging.handlers import RotatingFileHandler
- # 第三方库导入
- import pymysql
- import requests
- # 自定义模块导入
- from DQN_env import UFParams
- from DQN_decide import run_uf_DQN_decide, generate_plc_instructions, calc_uf_cycle_metrics
- # 日志系统配置
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- # 日志输出格式
- formatter = logging.Formatter(
- '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- # 文件日志处理器,单个文件最大5MB,保留3个备份
- file_handler = RotatingFileHandler('monitor_service.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
- file_handler.setFormatter(formatter)
- # 控制台日志处理器
- console_handler = logging.StreamHandler()
- console_handler.setFormatter(formatter)
- # 添加处理器
- logger.addHandler(file_handler)
- logger.addHandler(console_handler)
- # 配置加载函数
- def load_config(config_file='config.json'):
- """
- 从JSON配置文件加载系统配置
-
- 参数:
- config_file: 配置文件路径
-
- 返回:
- 配置字典
-
- 异常:
- 配置文件不存在或格式错误时抛出异常
- """
- try:
- with open(config_file, 'r', encoding='utf-8') as f:
- return json.load(f)
- except FileNotFoundError:
- logger.critical(f"配置文件未找到 {config_file}")
- raise
- except json.JSONDecodeError as e:
- logger.critical(f"配置文件格式错误 {config_file}: {e}")
- raise
- def get_current_config():
- """
- 获取当前配置,支持运行时配置动态变更
- """
- return load_config()
- # 初始化配置
- config = load_config()
- # 全局配置参数
- # API接口配置
- API_BASE_URL = config['api']['base_url']
- API_URL = API_BASE_URL + config['api']['current_data_endpoint']
- CALLBACK_URL = API_BASE_URL + config['api']['callback_endpoint']
- PLC_URL = API_BASE_URL + config['api']['plc_endpoint']
- # HTTP请求头
- HEADERS = {
- "Content-Type": "application/json",
- "JWT-TOKEN": config['api']['jwt_token']
- }
- # MySQL数据库配置,优先读取环境变量
- DB_USER = os.getenv('DB_USERNAME', config['database']['user'])
- DB_PASSWORD = os.getenv('DB_PASSWORD', config['database']['password'])
- DB_HOST = os.getenv('DB_HOST', config['database']['host'])
- DB_NAME = os.getenv('DB_DATABASE', config['database']['database'])
- DB_PORT = int(os.getenv('DB_PORT', str(config['database']['port'])))
- HISTORY_TABLE_NAME = config['database']['table_name']
- # 超滤系统参数
- uf_params = UFParams()
- PROJECT_ID_FOR_CALLBACK = config['scada']['project_id']
- SCADA_SECRET = config['scada']['secret']
- # 监控流程参数
- TRIGGER_VALUE = config['system']['trigger_value']
- NUM_VALUES_TO_COLLECT = config['system']['num_values_to_collect']
- POLL_INTERVAL = config['system']['poll_interval']
- # 设备列表
- DEVICE_SEQUENCE = config['devices']
- # 状态持久化配置
- STATE_FILE = 'device_states.json'
- _state_lock = threading.Lock()
- device_states = {}
- DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
- # 状态持久化函数
- def load_device_states():
- """
- 从状态文件加载所有设备的运行状态
- """
- global device_states
- with _state_lock:
- try:
- if os.path.exists(STATE_FILE):
- with open(STATE_FILE, 'r', encoding='utf-8') as f:
- content = f.read()
- if content:
- device_states = json.loads(content)
- logger.info(f"状态文件加载成功 {STATE_FILE}")
- else:
- logger.warning(f"状态文件为空 {STATE_FILE}")
- device_states = {}
- else:
- logger.info(f"状态文件不存在,首次运行 {STATE_FILE}")
- device_states = {}
- except (json.JSONDecodeError, IOError) as e:
- logger.error(f"状态文件加载失败 {STATE_FILE}: {e}")
- device_states = {}
- def save_device_state(device_name, state_data):
- """
- 保存单个设备的运行状态到文件
-
- 参数:
- device_name: 设备名称
- state_data: 设备状态数据字典
- """
- with _state_lock:
- try:
- # 读取现有状态
- full_states = {}
- if os.path.exists(STATE_FILE):
- with open(STATE_FILE, 'r', encoding='utf-8') as f:
- content = f.read()
- if content:
- full_states = json.loads(content)
- # 更新指定设备状态
- full_states[device_name] = state_data
- # 写回文件
- with open(STATE_FILE, 'w', encoding='utf-8') as f:
- json.dump(full_states, f, indent=4, ensure_ascii=False)
- # 更新内存缓存
- global device_states
- device_states[device_name] = state_data
- logger.info(f"[{device_name}] 状态保存成功")
- except (json.JSONDecodeError, IOError) as e:
- logger.error(f"[{device_name}] 状态保存失败: {e}")
- # 核心业务函数
- def create_db_connection():
- """
- 创建MySQL数据库连接
-
- 返回:
- 连接对象或None
- """
- try:
- connection = pymysql.connect(
- host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME,
- port=DB_PORT, charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor
- )
- logger.debug("数据库连接成功")
- return connection
- except pymysql.MySQLError as e:
- logger.error(f"数据库连接失败: {e}")
- return None
- def get_tmp_extremes(item_name, start_time, end_time, word_control):
- """
- 通过API查询历史数据中指定时间范围内的跨膜压差极值
-
- 参数:
- item_name: 数据项名称
- start_time: 开始时间
- end_time: 结束时间
- word_control: 控制字段名
-
- 返回:
- (最大值, 最小值) 或 (None, None)
- """
- # 转换时间为毫秒级时间戳
- start_timestamp = int(start_time.timestamp() * 1000)
- end_timestamp = int(end_time.timestamp() * 1000)
-
- logger.info(f"查询历史极值 {item_name} 从 {start_time.strftime(DATETIME_FORMAT)} 到 {end_time.strftime(DATETIME_FORMAT)}")
-
- # API基础URL
- api_base_url = "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data"
-
- try:
- # 第一次调用:查询item_name的极值
- params1 = {
- "deviceid": "1",
- "dataitemid": item_name,
- "project_id": "92",
- "stime": start_timestamp,
- "etime": end_timestamp,
- "size": "1",
- "interval": "minute",
- "aggregator": "new"
- }
-
- logger.info(f"第一次API调用: {api_base_url} 参数: {params1}")
- response1 = requests.get(api_base_url, params=params1, headers=HEADERS, timeout=30)
- response1.raise_for_status()
- data1 = response1.json()
- logger.debug(f"第一次API响应: {data1}")
-
- # 第二次调用:查询word_control的极值
- params2 = {
- "deviceid": "1",
- "dataitemid": word_control,
- "project_id": "92",
- "stime": start_timestamp,
- "etime": end_timestamp,
- "size": "1",
- "interval": "minute",
- "aggregator": "new"
- }
-
- logger.info(f"第二次API调用: {api_base_url} 参数: {params2}")
- response2 = requests.get(api_base_url, params=params2, headers=HEADERS, timeout=30)
- response2.raise_for_status()
- data2 = response2.json()
- logger.debug(f"第二次API响应: {data2}")
- # 处理两次API调用的结果
- max_val = None
- min_val = None
- # 从第一次调用结果中提取'跨膜压差'的值,并存储在字典中,以时间为键
- item_values = {}
- if data1.get("code") == 200 and data1.get("data"):
- for item in data1["data"]:
- if item.get("name") == item_name and item.get("val") is not None:
- time = item.get("htime_at")
- item_values[time] = float(item.get("val"))
- if item_values:
- logger.info(f"第一次API查询成功,提取到跨膜压差数据数量:{len(item_values)}")
- # 从第二次调用结果中提取'UF1控制字'为26的数据点,并进行时间匹配
- if data2.get("code") == 200 and data2.get("data"):
- control_26_values = []
- for item in data2["data"]:
- if item.get("name") == word_control and item.get("val") == '26':
- time = item.get("htime_at")
- # 如果在第一次数据中找到了对应的跨膜压差值
- if time in item_values:
- control_26_values.append(item_values[time])
- if control_26_values:
- logger.info(f"找到控制字为26的数据点,合并跨膜压差数据")
- max_val = max(control_26_values)
- min_val = min(control_26_values)
- # 增加最小跨膜压差的下限值
- if min_val < 0.01:
- min_val = 0.01
- logger.info(f"控制字为26时的最大跨膜压差值={max_val},最小跨膜压差值={min_val}")
- if max_val is not None and min_val is not None:
- logger.info(f"API查询成功 最大跨膜压差值={max_val} 最小跨膜压差值={min_val}")
- return max_val, min_val
- else:
- logger.warning("未找到有效的控制字为26时的跨膜压差数据")
- return None, None
-
- except requests.exceptions.RequestException as e:
- logger.error(f"API请求错误: {e}")
- return None, None
- except (json.JSONDecodeError, ValueError, KeyError) as e:
- logger.error(f"API响应解析错误: {e}")
- return None, None
- except Exception as e:
- logger.error(f"API查询未知错误: {e}")
- return None, None
- def generate_md5_signature(record_data, secret, timestamp):
- """
- 生成PLC请求的MD5签名
- """
- cal_str = f"{record_data}{secret}{timestamp}"
- return hashlib.md5(cal_str.encode('utf-8')).hexdigest().upper()
- def send_plc_update(device_name, item, old_value, new_value, command_type):
- """
- 向PLC发送参数更新指令
-
- 参数:
- device_name: 设备名称
- item: 参数项名称
- old_value: 旧值
- new_value: 新值
- command_type: 指令类型
-
- 返回:
- 是否发送成功
- """
- # 构造签名和请求数据
- timestamp = int(time.time()) # 生成时间戳
- record_obj = {
- "project_id": PROJECT_ID_FOR_CALLBACK, # 项目ID
- "item": item, # 参数项名称
- "old_value": old_value, # 旧值
- "new_value": new_value, # 新值
- "command_type": command_type # 指令类型
- }
- record_data = json.dumps([record_obj]) # 生成签名数据
- signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp) # 生成签名
- url = f"{PLC_URL}?sign={signature}×tamp={timestamp}" # 生成请求URL
- payload = [record_obj]
- logger.info(f"[{device_name}] PLC指令 {item} 从 {old_value} 到 {new_value}")
- logger.debug(f"[{device_name}] 签名数据 {record_data}")
- logger.debug(f"[{device_name}] 签名值 {signature}")
- # 重试机制
- max_retries, retry_interval = 3, 60 # 重试次数 重试间隔
- for attempt in range(1, max_retries + 1):
- try:
- logger.info(f"[{device_name}] 发送PLC指令 尝试 {attempt}/{max_retries}")
- response = requests.post(url, json=payload, timeout=15) # 发送PLC指令 请求头 请求体 超时时间
- response_json = response.json()
- if response_json.get('code') == 200:
- logger.info(f"[{device_name}] PLC指令发送成功 响应 {response_json}")
- return True
- else:
- logger.error(f"[{device_name}] PLC指令发送失败 {response_json.get('msg', '未知错误')}")
- except requests.exceptions.RequestException as e:
- logger.error(f"[{device_name}] PLC指令网络错误 {e}")
- except Exception as e:
- logger.error(f"[{device_name}] PLC指令未知错误 {e}")
- if attempt < max_retries: # 重试次数 小于 最大重试次数
- logger.info(f"[{device_name}] 等待{retry_interval}秒后重试")
- time.sleep(retry_interval)
- logger.error(f"[{device_name}] PLC指令发送失败,已达最大重试次数")
- return False
- def send_decision_to_callback(type_name, **kwargs):
- """
- 发送决策结果到回调接口
-
- 参数:
- type_name: 设备类型名称
- **kwargs: 决策结果数据
-
- 返回:
- use_model状态值: 1表示开启模型,0表示关闭模型,None表示发送失败
- """
- payload = {"list": [{"type": type_name, "project_id": PROJECT_ID_FOR_CALLBACK, **kwargs}]} # 请求负载 设备类型 项目ID 决策结果数据
- logger.info(f"[{type_name}] 发送决策数据\n{json.dumps(payload, indent=2, ensure_ascii=False)}")
- max_retries, retry_interval = 3, 60 # 重试次数 重试间隔
- for attempt in range(1, max_retries + 1):
- try:
- logger.info(f"[{type_name}] 发送回调 尝试 {attempt}/{max_retries}")
- response = requests.post(CALLBACK_URL, headers=HEADERS, json=payload, timeout=15) # 发送回调 请求头 请求体 超时时间
- response.raise_for_status()
- response_json = response.json()
- logger.info(f"[{type_name}] 回调发送成功 响应 {response.text}")
-
- # 提取返回的 data 字段,表示 use_model 状态(1=开启,0=关闭)
- use_model_status = response_json.get('data')
- logger.info(f"[{type_name}] 服务器返回 use_model 状态: {use_model_status}")
- return use_model_status
- except requests.exceptions.RequestException as e:
- logger.error(f"[{type_name}] 回调发送失败 {e}")
- except (json.JSONDecodeError, ValueError) as e:
- logger.error(f"[{type_name}] 响应解析失败 {e}")
- if attempt < max_retries: # 重试次数 小于 最大重试次数
- logger.info(f"[{type_name}] 等待{retry_interval}秒后重试")
- time.sleep(retry_interval)
- logger.error(f"[{type_name}] 回调发送失败,已达最大重试次数")
- return None
- def get_device_value(payload, device_name):
- """
- 从API获取设备数据项的当前值
-
- 参数:
- payload: 请求负载
- device_name: 设备名称
-
- 返回:
- 数据值或None
- """
- try:
- response = requests.post(API_URL, headers=HEADERS, json=[payload], timeout=10) # 发送请求 请求头 请求体 超时时间
- response.raise_for_status()
- api_response = response.json() # 解析响应
- if api_response.get("code") == 200 and api_response.get("data"):
- val_str = api_response["data"][0].get("val") # 获取数据值
- if val_str is not None:
- return float(val_str)
- else:
- logger.error(f"[{device_name}] 获取数据失败 {payload['deviceItems']} {api_response.get('msg', '未知错误')}") # 日志 设备名称 请求负载 响应
- except requests.exceptions.RequestException as e:
- logger.error(f"[{device_name}] API网络错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误
- except (json.JSONDecodeError, ValueError, IndexError) as e:
- logger.error(f"[{device_name}] API数据解析错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误
- return None
- # 设备监控主循环
- def monitor_device(device):
- """
- 单个设备的监控循环
-
- 完整流程:
- 1. 等待触发条件
- 2. 收集稳定数据
- 3. 执行决策计算
- 4. 发送控制指令
- 5. 等待重置信号
-
- 参数:
- device: 设备配置字典
- """
- name = device["name"]
- threading.current_thread().name = name
- logger.info("监控线程启动")
- # 加载设备历史状态
- device_state = device_states.get(name, {}) # 设备状态
- model_prev_L_s = device_state.get('model_prev_L_s') # 过滤时间 上一轮
- model_prev_t_bw_s = device_state.get('model_prev_t_bw_s') # 反洗时间 上一轮
- last_cycle_end_time_str = device_state.get('last_cycle_end_time') # 上次运行结束时间
- # 解析上次运行结束时间
- last_cycle_end_time = None # 上次运行结束时间
- if last_cycle_end_time_str:
- try:
- last_cycle_end_time = datetime.strptime(last_cycle_end_time_str, DATETIME_FORMAT) # 上次运行结束时间
- logger.info(f"历史状态加载成功,上次运行时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
- except ValueError:
- logger.warning(f"时间戳解析失败 {last_cycle_end_time_str}")
- else:
- logger.info("首次运行,无历史状态")
- # 主循环
- while True:
- try:
- # 阶段1: 等待触发条件 (控制字=95)
- logger.info(f"等待触发 控制字需等于 {TRIGGER_VALUE}")
- while True:
- control_value = get_device_value(device["control_payload"], name) # 控制字
- if control_value is not None and int(control_value) == TRIGGER_VALUE: # 控制字 等于 触发值 95
- logger.info("触发条件满足,开始等待控制字变为26")
- break
- time.sleep(POLL_INTERVAL)
- # 阶段1.5: 等待控制字变为26
- logger.info("等待控制字变为26")
- while True:
- control_value = get_device_value(device["control_payload"], name) # 控制字
- if control_value is not None and int(control_value) == 26: # 控制字 等于 26
- logger.info("控制字变为26,开始收集数据(控制字在22-26范围内有效)")
- break
- time.sleep(POLL_INTERVAL)
- # 阶段2: 收集数据(控制字在22-26范围内视为有效,初始10分钟,未收集到有效数据时延长到30分钟)
- logger.info("开始收集TMP数据(控制字在22-26范围内有效)")
- collected_values = []
- start_collection_time = datetime.now()
- initial_duration = timedelta(minutes=10) # 初始10分钟
- extended_duration = timedelta(minutes=30) # 延长到30分钟
- collection_duration = initial_duration
- has_valid_data = False # 标记是否收集到22-26范围内的有效数据
-
- # 日志计数器,每收集60个点打印一次,避免日志过多
- log_interval = 60
-
- while datetime.now() - start_collection_time < collection_duration:
- current_value = get_device_value(device["target_payload"], name) # 当前值
- control_value = get_device_value(device["control_payload"], name) # 检查控制字
-
- if control_value is not None:
- control_int = int(control_value)
-
- # 如果控制字变为95,说明系统重置了,需要重新开始
- if control_int == TRIGGER_VALUE:
- logger.info("控制字变为95,系统重置,重新开始监控")
- break
-
- # 如果控制字在22-26范围内,视为有效,收集数据
- elif 22 <= control_int <= 26:
- has_valid_data = True
- if current_value is not None:
- collected_values.append(current_value)
- # 每收集60个点或第一个点时打印日志,减少日志数量
- if len(collected_values) == 1 or len(collected_values) % log_interval == 0:
- logger.info(f"收集TMP值 {current_value:.4f} 控制字={control_int} 已收集 {len(collected_values)} 个数据点")
-
- # 如果控制字不在22-26范围内,不停止,继续等待(可能后续会回到22-26范围)
- # 不记录日志,避免日志过多
-
- time.sleep(POLL_INTERVAL)
-
- # 如果10分钟内没有收集到22-26范围内的数据,延长收集时间到30分钟
- elapsed_time = datetime.now() - start_collection_time
- if elapsed_time >= initial_duration and not has_valid_data and collection_duration == initial_duration:
- logger.info("10分钟内未收集到22-26范围内的数据,延长收集时间到30分钟")
- collection_duration = extended_duration
-
- # 检查是否收集到了22-26范围内的有效数据
- if not has_valid_data:
- logger.warning("30分钟内未收集到22-26范围内的有效数据,跳过本轮")
- # 检查控制字状态,如果已经是95则直接开始新一轮
- control_value = get_device_value(device["control_payload"], name)
- if control_value is not None and int(control_value) == TRIGGER_VALUE:
- logger.info("控制字已经是95,直接开始新一轮")
- continue
- else:
- # 等待控制字重置后再继续
- logger.info("等待控制字重置...")
- time.sleep(10) # 等待10秒
- continue
-
- # 如果收集到了有效数据但数据为空
- if not collected_values:
- logger.warning("收集到有效控制字但未收集到TMP数据,跳过本轮")
- control_value = get_device_value(device["control_payload"], name)
- if control_value is not None and int(control_value) == TRIGGER_VALUE:
- logger.info("控制字已经是95,直接开始新一轮")
- continue
- else:
- logger.info("等待控制字重置...")
- time.sleep(10)
- continue
- # 阶段3: 决策计算
- logger.info(f"数据收集完成,共收集 {len(collected_values)} 个数据点,开始决策计算")
- if collected_values:
- # 计算平均值作为代表值
- average_value = sum(collected_values) / len(collected_values)
- logger.info(f"TMP平均值 {average_value:.4f}")
- # 确定历史数据查询时间范围
- current_decision_time = datetime.now()
- start_query_time = last_cycle_end_time if last_cycle_end_time else current_decision_time - timedelta(hours=48)
- _word_controldevice = device["control_payload"]["deviceItems"]
- # 查询历史极值
- max_tmp, min_tmp = get_tmp_extremes(device["press_pv_item"], start_query_time, current_decision_time, _word_controldevice)
- # 调用DQN模型获取决策建议
- logger.info("调用DQN决策模型")
- uf_bw_dict = run_uf_DQN_decide(uf_params, average_value)
- logger.info(f"模型决策结果 {uf_bw_dict}")
- # 获取当前PLC参数
- prod_time = get_device_value(device["production_time_payload"], name) or 3800 # 产水时间 默认3800
- bw_time = get_device_value(device["backwashing_payload"], name) or 100 # 反洗时间 默认100
- bw_per_ceb = get_device_value(device["ceb_payload"], name) or 40 # CEB 次数时间 默认40
- # 生成PLC指令
- L_s, t_bw_s = generate_plc_instructions(
- prod_time, bw_time, # 产水时间 反洗时间
- model_prev_L_s, model_prev_t_bw_s, # 过滤时间 反洗时间 上一轮
- uf_bw_dict["L_s"], uf_bw_dict["t_bw_s"] # 过滤时间 反洗时间 决策建议
- )
-
- # 计算运行指标
- logger.info(f"计算运行指标 TMP={average_value} L_s={L_s} t_bw_s={t_bw_s}")
- metrics = calc_uf_cycle_metrics(uf_params, average_value, max_tmp, min_tmp, L_s, t_bw_s) # 计算运行指标
- ceb_backwash_frequency = int(metrics["k_bw_per_ceb"])
-
- # 发送决策结果,并获取服务器返回的 use_model 状态
- use_model_status = send_decision_to_callback(
- type_name=name, # 设备名称
- water_production_time=int(L_s), # 过滤时间
- physical_backwash=int(t_bw_s), # 反洗时间
- ceb_backwash_frequency=ceb_backwash_frequency, # 化学反洗频率
- duration_system=int(prod_time), # 系统运行时间
- tmp_action=average_value, # TMP动作
- recovery_rate=metrics["recovery"], # 回收率
- ton_water_energy_kWh=metrics['ton_water_energy_kWh_per_m3'], # 吨水电耗
- max_permeability=metrics['max_permeability'], # 最高渗透率
- daily_prod_time_h=metrics['daily_prod_time_h'], # 日均产水时间
- ctime=current_decision_time.strftime(DATETIME_FORMAT) # 时间
- )
- # 判断是否下发PLC指令,根据服务器返回的 use_model 状态
- if use_model_status == 1:
- logger.info("模型开关已开启,检查PLC指令")
-
- # 记录当前PLC值和模型决策值
- current_plc_values = {
- 'prod_time': int(prod_time),
- 'bw_time': int(bw_time),
- 'bw_per_ceb': int(bw_per_ceb)
- }
- model_decision_values = {
- 'L_s': int(L_s),
- 't_bw_s': int(t_bw_s),
- 'ceb_frequency': int(ceb_backwash_frequency)
- }
-
- logger.info(f"当前PLC值: 产水时间={current_plc_values['prod_time']}, 反洗时间={current_plc_values['bw_time']}, CEB次数={current_plc_values['bw_per_ceb']}")
- logger.info(f"模型决策值: L_s={model_decision_values['L_s']}, t_bw_s={model_decision_values['t_bw_s']}, ceb_frequency={model_decision_values['ceb_frequency']}")
-
- # 检查每个参数是否需要下发指令
-
- # 检查产水时间是否需要更新
- if current_plc_values['prod_time'] != model_decision_values['L_s']:
- logger.info(f"产水时间需要更新: {current_plc_values['prod_time']} -> {model_decision_values['L_s']}")
- send_plc_update(name, device["production_time_payload"]["deviceItems"], str(prod_time), str(model_decision_values['L_s']), 1)
- else:
- logger.info(f"产水时间无需更新: {current_plc_values['prod_time']}")
-
- # 检查反洗时间是否需要更新
- if current_plc_values['bw_time'] != model_decision_values['t_bw_s']:
- logger.info(f"反洗时间需要更新: {current_plc_values['bw_time']} -> {model_decision_values['t_bw_s']}")
- send_plc_update(name, device["backwashing_payload"]["deviceItems"], str(bw_time), str(model_decision_values['t_bw_s']), 4)
- else:
- logger.info(f"反洗时间无需更新: {current_plc_values['bw_time']}")
-
- # 检查CEB次数是否需要更新
- if current_plc_values['bw_per_ceb'] != model_decision_values['ceb_frequency']:
- logger.info(f"CEB次数需要更新: {current_plc_values['bw_per_ceb']} -> {model_decision_values['ceb_frequency']}")
- send_plc_update(name, device["ceb_payload"]["deviceItems"], str(bw_per_ceb), str(model_decision_values['ceb_frequency']), 2)
- else:
- logger.info(f"CEB次数无需更新: {current_plc_values['bw_per_ceb']}")
- elif use_model_status == 0:
- logger.info("服务器返回 use_model=0,模型开关已关闭,跳过PLC指令")
- else:
- logger.warning("回调发送失败,无法获取 use_model 状态,跳过PLC指令")
- # 保存运行状态
- model_prev_L_s = L_s # 过滤时间 上一轮
- model_prev_t_bw_s = t_bw_s # 反洗时间 上一轮
- last_cycle_end_time = current_decision_time # 上次运行结束时间
-
- # 获取配置的TMP历史记录数量
- current_config = get_current_config()
- tmp_history_count = current_config.get('system', {}).get('tmp_history_count', 5)
-
- # 从最新的内存缓存中读取当前设备状态(确保获取最新的历史记录)
- current_device_state = device_states.get(name, {})
- recent_tmp_values = current_device_state.get('recent_tmp_values', [])
- recent_tmp_values.append(round(average_value, 4))
- # 只保留最近N次
- recent_tmp_values = recent_tmp_values[-tmp_history_count:]
- state_to_save = {
- 'model_prev_L_s': model_prev_L_s, # 过滤时间 上一轮
- 'model_prev_t_bw_s': model_prev_t_bw_s, # 反洗时间 上一轮
- 'last_cycle_end_time': last_cycle_end_time.strftime(DATETIME_FORMAT), # 上次运行结束时间
- 'recent_tmp_values': recent_tmp_values # 最近N次TMP平均值(新增)
- }
- save_device_state(name, state_to_save) # 保存设备状态
- logger.info(f"状态保存完成 下次查询起始时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
- logger.info(f"最近{tmp_history_count}次TMP记录: {recent_tmp_values}")
- # 阶段4: 等待重置
- logger.info(f"等待重置 控制字需重新等于 {TRIGGER_VALUE}")
- # 等待一段时间,确保不是立即开始新一轮
- time.sleep(5) # 等待5秒
- while True:
- control_value = get_device_value(device["control_payload"], name) # 控制字
- if control_value is not None and int(control_value) == TRIGGER_VALUE: # 控制字 等于 触发值
- logger.info("完整周期结束,开始新一轮")
- break
- time.sleep(POLL_INTERVAL)
- logger.info(f"{name} 本轮完成\n")
- except Exception as e:
- logger.critical(f"监控循环异常 {e}", exc_info=True)
- logger.info("等待60秒后重试")
- time.sleep(60)
- # 程序主入口
- def main():
- """
- 主函数
-
- 功能:
- 1. 加载设备历史状态
- 2. 为每个设备启动独立监控线程
- 3. 保持主线程运行
- """
- logger.info("========================================")
- logger.info("超滤并行监控服务启动")
- logger.info("========================================")
- # 加载设备历史状态
- load_device_states()
- # 为每个设备创建监控线程
- threads = []
- for device_config in DEVICE_SEQUENCE:
- thread = threading.Thread(target=monitor_device, args=(device_config,), daemon=True)
- threads.append(thread)
- thread.start()
- logger.info(f"设备 {device_config['name']} 监控线程已启动")
- # 保持主线程运行
- try:
- while any(t.is_alive() for t in threads):
- time.sleep(1)
- except KeyboardInterrupt:
- logger.info("检测到中断信号,程序退出")
- def test_get_tmp_extremes():
- """
- 测试get_tmp_extremes函数的API调用
- """
- print("=" * 50)
- print("测试get_tmp_extremes API调用")
- print("=" * 50)
-
- # 设置测试参数
- test_item_name = "C.M.UF1_DB@press_PV" # 测试数据项
- test_word_control = "C.M.UF1_DB@word_control" # 测试控制字段
-
- # 设置测试时间范围(最近24小时)
- end_time = datetime.now()
- start_time = end_time - timedelta(hours=24)
-
- print(f"测试参数:")
- print(f" 数据项: {test_item_name}")
- print(f" 控制字段: {test_word_control}")
- print(f" 开始时间: {start_time.strftime(DATETIME_FORMAT)}")
- print(f" 结束时间: {end_time.strftime(DATETIME_FORMAT)}")
- print()
-
- try:
- # 调用函数
- max_val, min_val = get_tmp_extremes(test_item_name, start_time, end_time, test_word_control)
-
- print("测试结果:")
- if max_val is not None and min_val is not None:
- print(f" API调用成功")
- print(f" 最大值: {max_val}")
- print(f" 最小值: {min_val}")
- else:
- print(f" API调用失败或未返回有效数据")
- print(f" 最大值: {max_val}")
- print(f" 最小值: {min_val}")
-
- except Exception as e:
- print(f" 测试过程中发生异常: {e}")
-
- print("=" * 50)
- if __name__ == "__main__":
- # 运行测试用例
- # test_get_tmp_extremes()
-
- # 运行主程序
- main()
|