loop_main.py 32 KB


  1. # 标准库导入
  2. import time
  3. import json
  4. import os
  5. import threading
  6. import hashlib
  7. from datetime import datetime, timedelta
  8. import logging
  9. from logging.handlers import RotatingFileHandler
  10. # 第三方库导入
  11. import pymysql
  12. import requests
  13. # 自定义模块导入
  14. from DQN_env import UFParams
  15. from DQN_decide import run_uf_DQN_decide, generate_plc_instructions, calc_uf_cycle_metrics
  16. # 日志系统配置
  17. logger = logging.getLogger(__name__)
  18. logger.setLevel(logging.INFO)
  19. # 日志输出格式
  20. formatter = logging.Formatter(
  21. '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
  22. datefmt='%Y-%m-%d %H:%M:%S'
  23. )
  24. # 文件日志处理器,单个文件最大5MB,保留3个备份
  25. file_handler = RotatingFileHandler('monitor_service.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
  26. file_handler.setFormatter(formatter)
  27. # 控制台日志处理器
  28. console_handler = logging.StreamHandler()
  29. console_handler.setFormatter(formatter)
  30. # 添加处理器
  31. logger.addHandler(file_handler)
  32. logger.addHandler(console_handler)
  33. # 配置加载函数
  34. def load_config(config_file='config.json'):
  35. """
  36. 从JSON配置文件加载系统配置
  37. 参数:
  38. config_file: 配置文件路径
  39. 返回:
  40. 配置字典
  41. 异常:
  42. 配置文件不存在或格式错误时抛出异常
  43. """
  44. try:
  45. with open(config_file, 'r', encoding='utf-8') as f:
  46. return json.load(f)
  47. except FileNotFoundError:
  48. logger.critical(f"配置文件未找到 {config_file}")
  49. raise
  50. except json.JSONDecodeError as e:
  51. logger.critical(f"配置文件格式错误 {config_file}: {e}")
  52. raise
  53. def get_current_config():
  54. """
  55. 获取当前配置,支持运行时配置动态变更
  56. """
  57. return load_config()
  58. # 初始化配置
  59. config = load_config()
  60. # 全局配置参数
  61. # API接口配置
  62. API_BASE_URL = config['api']['base_url']
  63. API_URL = API_BASE_URL + config['api']['current_data_endpoint']
  64. CALLBACK_URL = API_BASE_URL + config['api']['callback_endpoint']
  65. PLC_URL = API_BASE_URL + config['api']['plc_endpoint']
  66. # HTTP请求头
  67. HEADERS = {
  68. "Content-Type": "application/json",
  69. "JWT-TOKEN": config['api']['jwt_token']
  70. }
  71. # MySQL数据库配置,优先读取环境变量
  72. DB_USER = os.getenv('DB_USERNAME', config['database']['user'])
  73. DB_PASSWORD = os.getenv('DB_PASSWORD', config['database']['password'])
  74. DB_HOST = os.getenv('DB_HOST', config['database']['host'])
  75. DB_NAME = os.getenv('DB_DATABASE', config['database']['database'])
  76. DB_PORT = int(os.getenv('DB_PORT', str(config['database']['port'])))
  77. HISTORY_TABLE_NAME = config['database']['table_name']
  78. # 超滤系统参数
  79. uf_params = UFParams()
  80. PROJECT_ID_FOR_CALLBACK = config['scada']['project_id']
  81. SCADA_SECRET = config['scada']['secret']
  82. # 监控流程参数
  83. TRIGGER_VALUE = config['system']['trigger_value']
  84. NUM_VALUES_TO_COLLECT = config['system']['num_values_to_collect']
  85. POLL_INTERVAL = config['system']['poll_interval']
  86. # 设备列表
  87. DEVICE_SEQUENCE = config['devices']
  88. # 状态持久化配置
  89. STATE_FILE = 'device_states.json'
  90. _state_lock = threading.Lock()
  91. device_states = {}
  92. DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
  93. # 状态持久化函数
  94. def load_device_states():
  95. """
  96. 从状态文件加载所有设备的运行状态
  97. """
  98. global device_states
  99. with _state_lock:
  100. try:
  101. if os.path.exists(STATE_FILE):
  102. with open(STATE_FILE, 'r', encoding='utf-8') as f:
  103. content = f.read()
  104. if content:
  105. device_states = json.loads(content)
  106. logger.info(f"状态文件加载成功 {STATE_FILE}")
  107. else:
  108. logger.warning(f"状态文件为空 {STATE_FILE}")
  109. device_states = {}
  110. else:
  111. logger.info(f"状态文件不存在,首次运行 {STATE_FILE}")
  112. device_states = {}
  113. except (json.JSONDecodeError, IOError) as e:
  114. logger.error(f"状态文件加载失败 {STATE_FILE}: {e}")
  115. device_states = {}
  116. def save_device_state(device_name, state_data):
  117. """
  118. 保存单个设备的运行状态到文件
  119. 参数:
  120. device_name: 设备名称
  121. state_data: 设备状态数据字典
  122. """
  123. with _state_lock:
  124. try:
  125. # 读取现有状态
  126. full_states = {}
  127. if os.path.exists(STATE_FILE):
  128. with open(STATE_FILE, 'r', encoding='utf-8') as f:
  129. content = f.read()
  130. if content:
  131. full_states = json.loads(content)
  132. # 更新指定设备状态
  133. full_states[device_name] = state_data
  134. # 写回文件
  135. with open(STATE_FILE, 'w', encoding='utf-8') as f:
  136. json.dump(full_states, f, indent=4, ensure_ascii=False)
  137. # 更新内存缓存
  138. global device_states
  139. device_states[device_name] = state_data
  140. logger.info(f"[{device_name}] 状态保存成功")
  141. except (json.JSONDecodeError, IOError) as e:
  142. logger.error(f"[{device_name}] 状态保存失败: {e}")
  143. # 核心业务函数
  144. def create_db_connection():
  145. """
  146. 创建MySQL数据库连接
  147. 返回:
  148. 连接对象或None
  149. """
  150. try:
  151. connection = pymysql.connect(
  152. host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME,
  153. port=DB_PORT, charset='utf8mb4',
  154. cursorclass=pymysql.cursors.DictCursor
  155. )
  156. logger.debug("数据库连接成功")
  157. return connection
  158. except pymysql.MySQLError as e:
  159. logger.error(f"数据库连接失败: {e}")
  160. return None
  161. def get_tmp_extremes(item_name, start_time, end_time, word_control):
  162. """
  163. 通过API查询历史数据中指定时间范围内的跨膜压差极值
  164. 参数:
  165. item_name: 数据项名称
  166. start_time: 开始时间
  167. end_time: 结束时间
  168. word_control: 控制字段名
  169. 返回:
  170. (最大值, 最小值) 或 (None, None)
  171. """
  172. # 转换时间为毫秒级时间戳
  173. start_timestamp = int(start_time.timestamp() * 1000)
  174. end_timestamp = int(end_time.timestamp() * 1000)
  175. logger.info(f"查询历史极值 {item_name} 从 {start_time.strftime(DATETIME_FORMAT)} 到 {end_time.strftime(DATETIME_FORMAT)}")
  176. # API基础URL
  177. api_base_url = "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data"
  178. try:
  179. # 第一次调用:查询item_name的极值
  180. params1 = {
  181. "deviceid": "1",
  182. "dataitemid": item_name,
  183. "project_id": "92",
  184. "stime": start_timestamp,
  185. "etime": end_timestamp,
  186. "size": "1",
  187. "interval": "minute",
  188. "aggregator": "new"
  189. }
  190. logger.info(f"第一次API调用: {api_base_url} 参数: {params1}")
  191. response1 = requests.get(api_base_url, params=params1, headers=HEADERS, timeout=30)
  192. response1.raise_for_status()
  193. data1 = response1.json()
  194. logger.debug(f"第一次API响应: {data1}")
  195. # 第二次调用:查询word_control的极值
  196. params2 = {
  197. "deviceid": "1",
  198. "dataitemid": word_control,
  199. "project_id": "92",
  200. "stime": start_timestamp,
  201. "etime": end_timestamp,
  202. "size": "1",
  203. "interval": "minute",
  204. "aggregator": "new"
  205. }
  206. logger.info(f"第二次API调用: {api_base_url} 参数: {params2}")
  207. response2 = requests.get(api_base_url, params=params2, headers=HEADERS, timeout=30)
  208. response2.raise_for_status()
  209. data2 = response2.json()
  210. logger.debug(f"第二次API响应: {data2}")
  211. # 处理两次API调用的结果
  212. max_val = None
  213. min_val = None
  214. # 从第一次调用结果中提取'UF1跨膜压差'的值,并存储在字典中,以时间为键
  215. uf1_diff_values = {}
  216. if data1.get("code") == 200 and data1.get("data"):
  217. for item in data1["data"]:
  218. if item.get("name") == "UF1跨膜压差" and item.get("val") is not None:
  219. time = item.get("htime_at")
  220. uf1_diff_values[time] = float(item.get("val"))
  221. if uf1_diff_values:
  222. logger.info(f"第一次API查询成功,提取到跨膜压差数据数量:{len(uf1_diff_values)}")
  223. # 从第二次调用结果中提取'UF1控制字'为26的数据点,并进行时间匹配
  224. if data2.get("code") == 200 and data2.get("data"):
  225. control_26_values = []
  226. for item in data2["data"]:
  227. if item.get("name") == "UF1控制字" and item.get("val") == '26':
  228. time = item.get("htime_at")
  229. # 如果在第一次数据中找到了对应的跨膜压差值
  230. if time in uf1_diff_values:
  231. control_26_values.append(uf1_diff_values[time])
  232. if control_26_values:
  233. logger.info(f"找到控制字为26的数据点,合并跨膜压差数据")
  234. max_val = max(control_26_values)
  235. min_val = min(control_26_values)
  236. # 增加最小跨膜压差的下限值
  237. if min_val < 0.01:
  238. min_val = 0.01
  239. logger.info(f"控制字为26时的最大跨膜压差值={max_val},最小跨膜压差值={min_val}")
  240. if max_val is not None and min_val is not None:
  241. logger.info(f"API查询成功 最大跨膜压差值={max_val} 最小跨膜压差值={min_val}")
  242. return max_val, min_val
  243. else:
  244. logger.warning("未找到有效的控制字为26时的跨膜压差数据")
  245. return None, None
  246. except requests.exceptions.RequestException as e:
  247. logger.error(f"API请求错误: {e}")
  248. return None, None
  249. except (json.JSONDecodeError, ValueError, KeyError) as e:
  250. logger.error(f"API响应解析错误: {e}")
  251. return None, None
  252. except Exception as e:
  253. logger.error(f"API查询未知错误: {e}")
  254. return None, None
  255. def generate_md5_signature(record_data, secret, timestamp):
  256. """
  257. 生成PLC请求的MD5签名
  258. """
  259. cal_str = f"{record_data}{secret}{timestamp}"
  260. return hashlib.md5(cal_str.encode('utf-8')).hexdigest().upper()
  261. def send_plc_update(device_name, item, old_value, new_value, command_type):
  262. """
  263. 向PLC发送参数更新指令
  264. 参数:
  265. device_name: 设备名称
  266. item: 参数项名称
  267. old_value: 旧值
  268. new_value: 新值
  269. command_type: 指令类型
  270. 返回:
  271. 是否发送成功
  272. """
  273. # 构造签名和请求数据
  274. timestamp = int(time.time()) # 生成时间戳
  275. record_obj = {
  276. "project_id": PROJECT_ID_FOR_CALLBACK, # 项目ID
  277. "item": item, # 参数项名称
  278. "old_value": old_value, # 旧值
  279. "new_value": new_value, # 新值
  280. "command_type": command_type # 指令类型
  281. }
  282. record_data = json.dumps([record_obj]) # 生成签名数据
  283. signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp) # 生成签名
  284. url = f"{PLC_URL}?sign={signature}&timestamp={timestamp}" # 生成请求URL
  285. payload = [record_obj]
  286. logger.info(f"[{device_name}] PLC指令 {item} 从 {old_value} 到 {new_value}")
  287. logger.debug(f"[{device_name}] 签名数据 {record_data}")
  288. logger.debug(f"[{device_name}] 签名值 {signature}")
  289. # 重试机制
  290. max_retries, retry_interval = 3, 60 # 重试次数 重试间隔
  291. for attempt in range(1, max_retries + 1):
  292. try:
  293. logger.info(f"[{device_name}] 发送PLC指令 尝试 {attempt}/{max_retries}")
  294. response = requests.post(url, json=payload, timeout=15) # 发送PLC指令 请求头 请求体 超时时间
  295. response_json = response.json()
  296. if response_json.get('code') == 200:
  297. logger.info(f"[{device_name}] PLC指令发送成功 响应 {response_json}")
  298. return True
  299. else:
  300. logger.error(f"[{device_name}] PLC指令发送失败 {response_json.get('msg', '未知错误')}")
  301. except requests.exceptions.RequestException as e:
  302. logger.error(f"[{device_name}] PLC指令网络错误 {e}")
  303. except Exception as e:
  304. logger.error(f"[{device_name}] PLC指令未知错误 {e}")
  305. if attempt < max_retries: # 重试次数 小于 最大重试次数
  306. logger.info(f"[{device_name}] 等待{retry_interval}秒后重试")
  307. time.sleep(retry_interval)
  308. logger.error(f"[{device_name}] PLC指令发送失败,已达最大重试次数")
  309. return False
  310. def send_decision_to_callback(type_name, **kwargs):
  311. """
  312. 发送决策结果到回调接口
  313. 参数:
  314. type_name: 设备类型名称
  315. **kwargs: 决策结果数据
  316. 返回:
  317. use_model状态值: 1表示开启模型,0表示关闭模型,None表示发送失败
  318. """
  319. payload = {"list": [{"type": type_name, "project_id": PROJECT_ID_FOR_CALLBACK, **kwargs}]} # 请求负载 设备类型 项目ID 决策结果数据
  320. logger.info(f"[{type_name}] 发送决策数据\n{json.dumps(payload, indent=2, ensure_ascii=False)}")
  321. max_retries, retry_interval = 3, 60 # 重试次数 重试间隔
  322. for attempt in range(1, max_retries + 1):
  323. try:
  324. logger.info(f"[{type_name}] 发送回调 尝试 {attempt}/{max_retries}")
  325. response = requests.post(CALLBACK_URL, headers=HEADERS, json=payload, timeout=15) # 发送回调 请求头 请求体 超时时间
  326. response.raise_for_status()
  327. response_json = response.json()
  328. logger.info(f"[{type_name}] 回调发送成功 响应 {response.text}")
  329. # 提取返回的 data 字段,表示 use_model 状态(1=开启,0=关闭)
  330. use_model_status = response_json.get('data')
  331. logger.info(f"[{type_name}] 服务器返回 use_model 状态: {use_model_status}")
  332. return use_model_status
  333. except requests.exceptions.RequestException as e:
  334. logger.error(f"[{type_name}] 回调发送失败 {e}")
  335. except (json.JSONDecodeError, ValueError) as e:
  336. logger.error(f"[{type_name}] 响应解析失败 {e}")
  337. if attempt < max_retries: # 重试次数 小于 最大重试次数
  338. logger.info(f"[{type_name}] 等待{retry_interval}秒后重试")
  339. time.sleep(retry_interval)
  340. logger.error(f"[{type_name}] 回调发送失败,已达最大重试次数")
  341. return None
  342. def get_device_value(payload, device_name):
  343. """
  344. 从API获取设备数据项的当前值
  345. 参数:
  346. payload: 请求负载
  347. device_name: 设备名称
  348. 返回:
  349. 数据值或None
  350. """
  351. try:
  352. response = requests.post(API_URL, headers=HEADERS, json=[payload], timeout=10) # 发送请求 请求头 请求体 超时时间
  353. response.raise_for_status()
  354. api_response = response.json() # 解析响应
  355. if api_response.get("code") == 200 and api_response.get("data"):
  356. val_str = api_response["data"][0].get("val") # 获取数据值
  357. if val_str is not None:
  358. return float(val_str)
  359. else:
  360. logger.error(f"[{device_name}] 获取数据失败 {payload['deviceItems']} {api_response.get('msg', '未知错误')}") # 日志 设备名称 请求负载 响应
  361. except requests.exceptions.RequestException as e:
  362. logger.error(f"[{device_name}] API网络错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误
  363. except (json.JSONDecodeError, ValueError, IndexError) as e:
  364. logger.error(f"[{device_name}] API数据解析错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误
  365. return None
  366. # 设备监控主循环
  367. def monitor_device(device):
  368. """
  369. 单个设备的监控循环
  370. 完整流程:
  371. 1. 等待触发条件
  372. 2. 收集稳定数据
  373. 3. 执行决策计算
  374. 4. 发送控制指令
  375. 5. 等待重置信号
  376. 参数:
  377. device: 设备配置字典
  378. """
  379. name = device["name"]
  380. threading.current_thread().name = name
  381. logger.info("监控线程启动")
  382. # 加载设备历史状态
  383. device_state = device_states.get(name, {}) # 设备状态
  384. model_prev_L_s = device_state.get('model_prev_L_s') # 过滤时间 上一轮
  385. model_prev_t_bw_s = device_state.get('model_prev_t_bw_s') # 反洗时间 上一轮
  386. last_cycle_end_time_str = device_state.get('last_cycle_end_time') # 上次运行结束时间
  387. # 解析上次运行结束时间
  388. last_cycle_end_time = None # 上次运行结束时间
  389. if last_cycle_end_time_str:
  390. try:
  391. last_cycle_end_time = datetime.strptime(last_cycle_end_time_str, DATETIME_FORMAT) # 上次运行结束时间
  392. logger.info(f"历史状态加载成功,上次运行时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
  393. except ValueError:
  394. logger.warning(f"时间戳解析失败 {last_cycle_end_time_str}")
  395. else:
  396. logger.info("首次运行,无历史状态")
  397. # 主循环
  398. while True:
  399. try:
  400. # 阶段1: 等待触发条件 (控制字=95)
  401. logger.info(f"等待触发 控制字需等于 {TRIGGER_VALUE}")
  402. while True:
  403. control_value = get_device_value(device["control_payload"], name) # 控制字
  404. if control_value is not None and int(control_value) == TRIGGER_VALUE: # 控制字 等于 触发值 95
  405. logger.info("触发条件满足,开始等待控制字变为26")
  406. break
  407. time.sleep(POLL_INTERVAL)
  408. # 阶段1.5: 等待控制字变为26
  409. logger.info("等待控制字变为26")
  410. while True:
  411. control_value = get_device_value(device["control_payload"], name) # 控制字
  412. if control_value is not None and int(control_value) == 26: # 控制字 等于 26
  413. logger.info("控制字变为26,开始收集10分钟数据")
  414. break
  415. time.sleep(POLL_INTERVAL)
  416. # 阶段2: 收集10分钟数据并计算平均值
  417. logger.info("开始收集10分钟TMP数据")
  418. collected_values = []
  419. start_collection_time = datetime.now()
  420. collection_duration = timedelta(minutes=10) # 10分钟
  421. # 日志计数器,每收集60个点打印一次,避免日志过多
  422. log_interval = 60
  423. while datetime.now() - start_collection_time < collection_duration:
  424. current_value = get_device_value(device["target_payload"], name) # 当前值
  425. control_value = get_device_value(device["control_payload"], name) # 检查控制字
  426. # 检查控制字是否保持26
  427. if control_value is not None and int(control_value) != 26:
  428. logger.warning(f"数据收集期间控制字发生变化: {control_value},停止收集")
  429. # 如果控制字变为95,说明系统重置了,需要重新开始
  430. if int(control_value) == TRIGGER_VALUE:
  431. logger.info("控制字变为95,系统重置,重新开始监控")
  432. break
  433. else:
  434. logger.info("控制字变为其他值,等待重置")
  435. break
  436. if current_value is not None:
  437. collected_values.append(current_value)
  438. # 每收集60个点或第一个点时打印日志,减少日志数量
  439. if len(collected_values) == 1 or len(collected_values) % log_interval == 0:
  440. logger.info(f"收集TMP值 {current_value:.4f} 已收集 {len(collected_values)} 个数据点")
  441. time.sleep(POLL_INTERVAL)
  442. if not collected_values:
  443. logger.warning("10分钟内未收集到有效数据,跳过本轮")
  444. # 检查控制字状态,如果已经是95则直接开始新一轮
  445. control_value = get_device_value(device["control_payload"], name)
  446. if control_value is not None and int(control_value) == TRIGGER_VALUE:
  447. logger.info("控制字已经是95,直接开始新一轮")
  448. continue
  449. else:
  450. # 等待控制字重置后再继续
  451. logger.info("等待控制字重置...")
  452. time.sleep(10) # 等待10秒
  453. continue
  454. # 阶段3: 决策计算
  455. logger.info(f"数据收集完成,共收集 {len(collected_values)} 个数据点,开始决策计算")
  456. if collected_values:
  457. # 计算平均值作为代表值
  458. average_value = sum(collected_values) / len(collected_values)
  459. logger.info(f"TMP平均值 {average_value:.4f}")
  460. # 确定历史数据查询时间范围
  461. current_decision_time = datetime.now()
  462. start_query_time = last_cycle_end_time if last_cycle_end_time else current_decision_time - timedelta(hours=48)
  463. _word_controldevice = device["control_payload"]["deviceItems"]
  464. # 查询历史极值
  465. max_tmp, min_tmp = get_tmp_extremes(device["press_pv_item"], start_query_time, current_decision_time, _word_controldevice)
  466. # 调用DQN模型获取决策建议
  467. logger.info("调用DQN决策模型")
  468. uf_bw_dict = run_uf_DQN_decide(uf_params, average_value)
  469. logger.info(f"模型决策结果 {uf_bw_dict}")
  470. # 获取当前PLC参数
  471. prod_time = get_device_value(device["production_time_payload"], name) or 3800 # 产水时间 默认3800
  472. bw_time = get_device_value(device["backwashing_payload"], name) or 100 # 反洗时间 默认100
  473. bw_per_ceb = get_device_value(device["ceb_payload"], name) or 40 # CEB 次数时间 默认40
  474. # 生成PLC指令
  475. L_s, t_bw_s = generate_plc_instructions(
  476. prod_time, bw_time, # 产水时间 反洗时间
  477. model_prev_L_s, model_prev_t_bw_s, # 过滤时间 反洗时间 上一轮
  478. uf_bw_dict["L_s"], uf_bw_dict["t_bw_s"] # 过滤时间 反洗时间 决策建议
  479. )
  480. # 计算运行指标
  481. logger.info(f"计算运行指标 TMP={average_value} L_s={L_s} t_bw_s={t_bw_s}")
  482. metrics = calc_uf_cycle_metrics(uf_params, average_value, max_tmp, min_tmp, L_s, t_bw_s) # 计算运行指标
  483. ceb_backwash_frequency = int(metrics["k_bw_per_ceb"])
  484. # 发送决策结果,并获取服务器返回的 use_model 状态
  485. use_model_status = send_decision_to_callback(
  486. type_name=name, # 设备名称
  487. water_production_time=int(L_s), # 过滤时间
  488. physical_backwash=int(t_bw_s), # 反洗时间
  489. ceb_backwash_frequency=ceb_backwash_frequency, # 化学反洗频率
  490. duration_system=int(prod_time), # 系统运行时间
  491. tmp_action=average_value, # TMP动作
  492. recovery_rate=metrics["recovery"], # 回收率
  493. ton_water_energy_kWh=metrics['ton_water_energy_kWh_per_m3'], # 吨水电耗
  494. max_permeability=metrics['max_permeability'], # 最高渗透率
  495. daily_prod_time_h=metrics['daily_prod_time_h'], # 日均产水时间
  496. ctime=current_decision_time.strftime(DATETIME_FORMAT) # 时间
  497. )
  498. # 判断是否下发PLC指令,根据服务器返回的 use_model 状态
  499. if use_model_status == 1:
  500. logger.info("模型开关已开启,检查PLC指令")
  501. # 记录当前PLC值和模型决策值
  502. current_plc_values = {
  503. 'prod_time': int(prod_time),
  504. 'bw_time': int(bw_time),
  505. 'bw_per_ceb': int(bw_per_ceb)
  506. }
  507. model_decision_values = {
  508. 'L_s': int(L_s),
  509. 't_bw_s': int(t_bw_s),
  510. 'ceb_frequency': int(ceb_backwash_frequency)
  511. }
  512. logger.info(f"当前PLC值: 产水时间={current_plc_values['prod_time']}, 反洗时间={current_plc_values['bw_time']}, CEB次数={current_plc_values['bw_per_ceb']}")
  513. logger.info(f"模型决策值: L_s={model_decision_values['L_s']}, t_bw_s={model_decision_values['t_bw_s']}, ceb_frequency={model_decision_values['ceb_frequency']}")
  514. # 检查每个参数是否需要下发指令
  515. # 检查产水时间是否需要更新
  516. if current_plc_values['prod_time'] != model_decision_values['L_s']:
  517. logger.info(f"产水时间需要更新: {current_plc_values['prod_time']} -> {model_decision_values['L_s']}")
  518. send_plc_update(name, device["production_time_payload"]["deviceItems"], str(prod_time), str(model_decision_values['L_s']), 1)
  519. else:
  520. logger.info(f"产水时间无需更新: {current_plc_values['prod_time']}")
  521. # 检查反洗时间是否需要更新
  522. if current_plc_values['bw_time'] != model_decision_values['t_bw_s']:
  523. logger.info(f"反洗时间需要更新: {current_plc_values['bw_time']} -> {model_decision_values['t_bw_s']}")
  524. send_plc_update(name, device["backwashing_payload"]["deviceItems"], str(bw_time), str(model_decision_values['t_bw_s']), 4)
  525. else:
  526. logger.info(f"反洗时间无需更新: {current_plc_values['bw_time']}")
  527. # 检查CEB次数是否需要更新
  528. if current_plc_values['bw_per_ceb'] != model_decision_values['ceb_frequency']:
  529. logger.info(f"CEB次数需要更新: {current_plc_values['bw_per_ceb']} -> {model_decision_values['ceb_frequency']}")
  530. send_plc_update(name, device["ceb_payload"]["deviceItems"], str(bw_per_ceb), str(model_decision_values['ceb_frequency']), 2)
  531. else:
  532. logger.info(f"CEB次数无需更新: {current_plc_values['bw_per_ceb']}")
  533. elif use_model_status == 0:
  534. logger.info("服务器返回 use_model=0,模型开关已关闭,跳过PLC指令")
  535. else:
  536. logger.warning("回调发送失败,无法获取 use_model 状态,跳过PLC指令")
  537. # 保存运行状态
  538. model_prev_L_s = L_s # 过滤时间 上一轮
  539. model_prev_t_bw_s = t_bw_s # 反洗时间 上一轮
  540. last_cycle_end_time = current_decision_time # 上次运行结束时间
  541. # 获取配置的TMP历史记录数量
  542. current_config = get_current_config()
  543. tmp_history_count = current_config.get('system', {}).get('tmp_history_count', 5)
  544. # 从最新的内存缓存中读取当前设备状态(确保获取最新的历史记录)
  545. current_device_state = device_states.get(name, {})
  546. recent_tmp_values = current_device_state.get('recent_tmp_values', [])
  547. recent_tmp_values.append(round(average_value, 4))
  548. # 只保留最近N次
  549. recent_tmp_values = recent_tmp_values[-tmp_history_count:]
  550. state_to_save = {
  551. 'model_prev_L_s': model_prev_L_s, # 过滤时间 上一轮
  552. 'model_prev_t_bw_s': model_prev_t_bw_s, # 反洗时间 上一轮
  553. 'last_cycle_end_time': last_cycle_end_time.strftime(DATETIME_FORMAT), # 上次运行结束时间
  554. 'recent_tmp_values': recent_tmp_values # 最近N次TMP平均值(新增)
  555. }
  556. save_device_state(name, state_to_save) # 保存设备状态
  557. logger.info(f"状态保存完成 下次查询起始时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
  558. logger.info(f"最近{tmp_history_count}次TMP记录: {recent_tmp_values}")
  559. # 阶段4: 等待重置
  560. logger.info(f"等待重置 控制字需重新等于 {TRIGGER_VALUE}")
  561. # 等待一段时间,确保不是立即开始新一轮
  562. time.sleep(5) # 等待5秒
  563. while True:
  564. control_value = get_device_value(device["control_payload"], name) # 控制字
  565. if control_value is not None and int(control_value) == TRIGGER_VALUE: # 控制字 等于 触发值
  566. logger.info("完整周期结束,开始新一轮")
  567. break
  568. time.sleep(POLL_INTERVAL)
  569. logger.info(f"{name} 本轮完成\n")
  570. except Exception as e:
  571. logger.critical(f"监控循环异常 {e}", exc_info=True)
  572. logger.info("等待60秒后重试")
  573. time.sleep(60)
  574. # 程序主入口
  575. def main():
  576. """
  577. 主函数
  578. 功能:
  579. 1. 加载设备历史状态
  580. 2. 为每个设备启动独立监控线程
  581. 3. 保持主线程运行
  582. """
  583. logger.info("========================================")
  584. logger.info("超滤并行监控服务启动")
  585. logger.info("========================================")
  586. # 加载设备历史状态
  587. load_device_states()
  588. # 为每个设备创建监控线程
  589. threads = []
  590. for device_config in DEVICE_SEQUENCE:
  591. thread = threading.Thread(target=monitor_device, args=(device_config,), daemon=True)
  592. threads.append(thread)
  593. thread.start()
  594. logger.info(f"设备 {device_config['name']} 监控线程已启动")
  595. # 保持主线程运行
  596. try:
  597. while any(t.is_alive() for t in threads):
  598. time.sleep(1)
  599. except KeyboardInterrupt:
  600. logger.info("检测到中断信号,程序退出")
  601. def test_get_tmp_extremes():
  602. """
  603. 测试get_tmp_extremes函数的API调用
  604. """
  605. print("=" * 50)
  606. print("测试get_tmp_extremes API调用")
  607. print("=" * 50)
  608. # 设置测试参数
  609. test_item_name = "C.M.UF1_DB@press_PV" # 测试数据项
  610. test_word_control = "C.M.UF1_DB@word_control" # 测试控制字段
  611. # 设置测试时间范围(最近24小时)
  612. end_time = datetime.now()
  613. start_time = end_time - timedelta(hours=24)
  614. print(f"测试参数:")
  615. print(f" 数据项: {test_item_name}")
  616. print(f" 控制字段: {test_word_control}")
  617. print(f" 开始时间: {start_time.strftime(DATETIME_FORMAT)}")
  618. print(f" 结束时间: {end_time.strftime(DATETIME_FORMAT)}")
  619. print()
  620. try:
  621. # 调用函数
  622. max_val, min_val = get_tmp_extremes(test_item_name, start_time, end_time, test_word_control)
  623. print("测试结果:")
  624. if max_val is not None and min_val is not None:
  625. print(f" API调用成功")
  626. print(f" 最大值: {max_val}")
  627. print(f" 最小值: {min_val}")
  628. else:
  629. print(f" API调用失败或未返回有效数据")
  630. print(f" 最大值: {max_val}")
  631. print(f" 最小值: {min_val}")
  632. except Exception as e:
  633. print(f" 测试过程中发生异常: {e}")
  634. print("=" * 50)
  635. if __name__ == "__main__":
  636. # 运行测试用例
  637. # test_get_tmp_extremes()
  638. # 运行主程序
  639. main()