loop_main.py 22 KB


  1. # 标准库导入
  2. import time
  3. import json
  4. import os
  5. import statistics
  6. import threading
  7. import hashlib
  8. from datetime import datetime, timedelta
  9. import logging
  10. from logging.handlers import RotatingFileHandler
  11. # 第三方库导入
  12. import pymysql
  13. import requests
  14. # 自定义模块导入
  15. from DQN_env import UFParams
  16. from DQN_decide import run_uf_DQN_decide, generate_plc_instructions, calc_uf_cycle_metrics
  17. # 日志系统配置
  18. logger = logging.getLogger(__name__)
  19. logger.setLevel(logging.INFO)
  20. # 日志输出格式
  21. formatter = logging.Formatter(
  22. '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
  23. datefmt='%Y-%m-%d %H:%M:%S'
  24. )
  25. # 文件日志处理器,单个文件最大5MB,保留3个备份
  26. file_handler = RotatingFileHandler('monitor_service.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
  27. file_handler.setFormatter(formatter)
  28. # 控制台日志处理器
  29. console_handler = logging.StreamHandler()
  30. console_handler.setFormatter(formatter)
  31. # 添加处理器
  32. logger.addHandler(file_handler)
  33. logger.addHandler(console_handler)
  34. # 配置加载函数
  35. def load_config(config_file='config.json'):
  36. """
  37. 从JSON配置文件加载系统配置
  38. 参数:
  39. config_file: 配置文件路径
  40. 返回:
  41. 配置字典
  42. 异常:
  43. 配置文件不存在或格式错误时抛出异常
  44. """
  45. try:
  46. with open(config_file, 'r', encoding='utf-8') as f:
  47. return json.load(f)
  48. except FileNotFoundError:
  49. logger.critical(f"配置文件未找到 {config_file}")
  50. raise
  51. except json.JSONDecodeError as e:
  52. logger.critical(f"配置文件格式错误 {config_file}: {e}")
  53. raise
  54. def get_current_config():
  55. """
  56. 获取当前配置,支持运行时配置动态变更
  57. """
  58. return load_config()
  59. # 初始化配置
  60. config = load_config()
  61. # 全局配置参数
  62. # API接口配置
  63. API_BASE_URL = config['api']['base_url']
  64. API_URL = API_BASE_URL + config['api']['current_data_endpoint']
  65. CALLBACK_URL = API_BASE_URL + config['api']['callback_endpoint']
  66. PLC_URL = API_BASE_URL + config['api']['plc_endpoint']
  67. # HTTP请求头
  68. HEADERS = {
  69. "Content-Type": "application/json",
  70. "JWT-TOKEN": config['api']['jwt_token']
  71. }
  72. # MySQL数据库配置,优先读取环境变量
  73. DB_USER = os.getenv('DB_USERNAME', config['database']['user'])
  74. DB_PASSWORD = os.getenv('DB_PASSWORD', config['database']['password'])
  75. DB_HOST = os.getenv('DB_HOST', config['database']['host'])
  76. DB_NAME = os.getenv('DB_DATABASE', config['database']['database'])
  77. DB_PORT = int(os.getenv('DB_PORT', str(config['database']['port'])))
  78. HISTORY_TABLE_NAME = config['database']['table_name']
  79. # 超滤系统参数
  80. uf_params = UFParams()
  81. PROJECT_ID_FOR_CALLBACK = config['scada']['project_id']
  82. SCADA_SECRET = config['scada']['secret']
  83. # 监控流程参数
  84. TRIGGER_VALUE = config['system']['trigger_value']
  85. NUM_VALUES_TO_COLLECT = config['system']['num_values_to_collect']
  86. POLL_INTERVAL = config['system']['poll_interval']
  87. # 设备列表
  88. DEVICE_SEQUENCE = config['devices']
  89. # 状态持久化配置
  90. STATE_FILE = 'device_states.json'
  91. _state_lock = threading.Lock()
  92. device_states = {}
  93. DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
  94. # 状态持久化函数
  95. def load_device_states():
  96. """
  97. 从状态文件加载所有设备的运行状态
  98. """
  99. global device_states
  100. with _state_lock:
  101. try:
  102. if os.path.exists(STATE_FILE):
  103. with open(STATE_FILE, 'r', encoding='utf-8') as f:
  104. content = f.read()
  105. if content:
  106. device_states = json.loads(content)
  107. logger.info(f"状态文件加载成功 {STATE_FILE}")
  108. else:
  109. logger.warning(f"状态文件为空 {STATE_FILE}")
  110. device_states = {}
  111. else:
  112. logger.info(f"状态文件不存在,首次运行 {STATE_FILE}")
  113. device_states = {}
  114. except (json.JSONDecodeError, IOError) as e:
  115. logger.error(f"状态文件加载失败 {STATE_FILE}: {e}")
  116. device_states = {}
  117. def save_device_state(device_name, state_data):
  118. """
  119. 保存单个设备的运行状态到文件
  120. 参数:
  121. device_name: 设备名称
  122. state_data: 设备状态数据字典
  123. """
  124. with _state_lock:
  125. try:
  126. # 读取现有状态
  127. full_states = {}
  128. if os.path.exists(STATE_FILE):
  129. with open(STATE_FILE, 'r', encoding='utf-8') as f:
  130. content = f.read()
  131. if content:
  132. full_states = json.loads(content)
  133. # 更新指定设备状态
  134. full_states[device_name] = state_data
  135. # 写回文件
  136. with open(STATE_FILE, 'w', encoding='utf-8') as f:
  137. json.dump(full_states, f, indent=4, ensure_ascii=False)
  138. # 更新内存缓存
  139. global device_states
  140. device_states[device_name] = state_data
  141. logger.info(f"[{device_name}] 状态保存成功")
  142. except (json.JSONDecodeError, IOError) as e:
  143. logger.error(f"[{device_name}] 状态保存失败: {e}")
  144. # 核心业务函数
  145. def create_db_connection():
  146. """
  147. 创建MySQL数据库连接
  148. 返回:
  149. 连接对象或None
  150. """
  151. try:
  152. connection = pymysql.connect(
  153. host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME,
  154. port=DB_PORT, charset='utf8mb4',
  155. cursorclass=pymysql.cursors.DictCursor
  156. )
  157. logger.debug("数据库连接成功")
  158. return connection
  159. except pymysql.MySQLError as e:
  160. logger.error(f"数据库连接失败: {e}")
  161. return None
  162. def get_tmp_extremes(item_name, start_time, end_time, word_control):
  163. """
  164. 查询历史数据中指定时间范围内的跨膜压差极值
  165. 参数:
  166. item_name: 数据项名称
  167. start_time: 开始时间
  168. end_time: 结束时间
  169. word_control: 控制字段名
  170. 返回:
  171. (最大值, 最小值) 或 (None, None)
  172. """
  173. start_time_str = start_time.strftime(DATETIME_FORMAT)
  174. end_time_str = end_time.strftime(DATETIME_FORMAT)
  175. query = f"""
  176. SELECT
  177. MAX(val) AS max_val,
  178. MIN(val) AS min_val
  179. FROM {HISTORY_TABLE_NAME}
  180. WHERE project_id = %s
  181. AND item_name = %s
  182. AND h_time IN (
  183. SELECT h_time
  184. FROM {HISTORY_TABLE_NAME}
  185. WHERE project_id = %s
  186. AND item_name = %s
  187. AND val = 26
  188. AND h_time BETWEEN %s AND %s
  189. )
  190. """
  191. logger.info(f"查询历史极值 {item_name} 从 {start_time_str} 到 {end_time_str}")
  192. logger.debug(query)
  193. db_connection = create_db_connection()
  194. if not db_connection:
  195. return None, None
  196. try:
  197. with db_connection.cursor() as cursor:
  198. cursor.execute(query, (PROJECT_ID_FOR_CALLBACK, item_name, PROJECT_ID_FOR_CALLBACK, word_control, start_time_str, end_time_str))
  199. result = cursor.fetchone()
  200. logger.debug(f"查询结果: {result}")
  201. if result and result['max_val'] is not None and result['min_val'] is not None:
  202. max_val = float(result['max_val'])
  203. min_val = float(result['min_val'])
  204. logger.info(f"查询成功 最大值={max_val} 最小值={min_val}")
  205. return max_val, min_val
  206. else:
  207. logger.warning("查询未返回有效数据")
  208. return None, None
  209. except pymysql.MySQLError as e:
  210. logger.error(f"数据库查询错误: {e}")
  211. return None, None
  212. finally:
  213. if db_connection and db_connection.open:
  214. db_connection.close()
  215. def generate_md5_signature(record_data, secret, timestamp):
  216. """
  217. 生成PLC请求的MD5签名
  218. """
  219. cal_str = f"{record_data}{secret}{timestamp}"
  220. return hashlib.md5(cal_str.encode('utf-8')).hexdigest().upper()
  221. def send_plc_update(device_name, item, old_value, new_value, command_type):
  222. """
  223. 向PLC发送参数更新指令
  224. 参数:
  225. device_name: 设备名称
  226. item: 参数项名称
  227. old_value: 旧值
  228. new_value: 新值
  229. command_type: 指令类型
  230. 返回:
  231. 是否发送成功
  232. """
  233. # 构造签名和请求数据
  234. timestamp = int(time.time()) # 生成时间戳
  235. record_obj = {
  236. "project_id": PROJECT_ID_FOR_CALLBACK, # 项目ID
  237. "item": item, # 参数项名称
  238. "old_value": old_value, # 旧值
  239. "new_value": new_value, # 新值
  240. "command_type": command_type # 指令类型
  241. }
  242. record_data = json.dumps([record_obj]) # 生成签名数据
  243. signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp) # 生成签名
  244. url = f"{PLC_URL}?sign={signature}&timestamp={timestamp}" # 生成请求URL
  245. payload = [record_obj]
  246. logger.info(f"[{device_name}] PLC指令 {item} 从 {old_value} 到 {new_value}")
  247. logger.debug(f"[{device_name}] 签名数据 {record_data}")
  248. logger.debug(f"[{device_name}] 签名值 {signature}")
  249. # 重试机制
  250. max_retries, retry_interval = 3, 60 # 重试次数 重试间隔
  251. for attempt in range(1, max_retries + 1):
  252. try:
  253. logger.info(f"[{device_name}] 发送PLC指令 尝试 {attempt}/{max_retries}")
  254. response = requests.post(url, json=payload, timeout=15) # 发送PLC指令 请求头 请求体 超时时间
  255. response_json = response.json()
  256. if response_json.get('code') == 200:
  257. logger.info(f"[{device_name}] PLC指令发送成功 响应 {response_json}")
  258. return True
  259. else:
  260. logger.error(f"[{device_name}] PLC指令发送失败 {response_json.get('msg', '未知错误')}")
  261. except requests.exceptions.RequestException as e:
  262. logger.error(f"[{device_name}] PLC指令网络错误 {e}")
  263. except Exception as e:
  264. logger.error(f"[{device_name}] PLC指令未知错误 {e}")
  265. if attempt < max_retries: # 重试次数 小于 最大重试次数
  266. logger.info(f"[{device_name}] 等待{retry_interval}秒后重试")
  267. time.sleep(retry_interval)
  268. logger.error(f"[{device_name}] PLC指令发送失败,已达最大重试次数")
  269. return False
  270. def send_decision_to_callback(type_name, **kwargs):
  271. """
  272. 发送决策结果到回调接口
  273. 参数:
  274. type_name: 设备类型名称
  275. **kwargs: 决策结果数据
  276. 返回:
  277. 是否发送成功
  278. """
  279. payload = {"list": [{"type": type_name, "project_id": PROJECT_ID_FOR_CALLBACK, **kwargs}]} # 请求负载 设备类型 项目ID 决策结果数据
  280. logger.info(f"[{type_name}] 发送决策数据\n{json.dumps(payload, indent=2, ensure_ascii=False)}")
  281. max_retries, retry_interval = 3, 60 # 重试次数 重试间隔
  282. for attempt in range(1, max_retries + 1):
  283. try:
  284. logger.info(f"[{type_name}] 发送回调 尝试 {attempt}/{max_retries}")
  285. response = requests.post(CALLBACK_URL, headers=HEADERS, json=payload, timeout=15) # 发送回调 请求头 请求体 超时时间
  286. response.raise_for_status()
  287. logger.info(f"[{type_name}] 回调发送成功 响应 {response.text}")
  288. return True
  289. except requests.exceptions.RequestException as e:
  290. logger.error(f"[{type_name}] 回调发送失败 {e}")
  291. if attempt < max_retries: # 重试次数 小于 最大重试次数
  292. logger.info(f"[{type_name}] 等待{retry_interval}秒后重试")
  293. time.sleep(retry_interval)
  294. logger.error(f"[{type_name}] 回调发送失败,已达最大重试次数")
  295. return False
  296. def get_device_value(payload, device_name):
  297. """
  298. 从API获取设备数据项的当前值
  299. 参数:
  300. payload: 请求负载
  301. device_name: 设备名称
  302. 返回:
  303. 数据值或None
  304. """
  305. try:
  306. response = requests.post(API_URL, headers=HEADERS, json=[payload], timeout=10) # 发送请求 请求头 请求体 超时时间
  307. response.raise_for_status()
  308. api_response = response.json() # 解析响应
  309. if api_response.get("code") == 200 and api_response.get("data"):
  310. val_str = api_response["data"][0].get("val") # 获取数据值
  311. if val_str is not None:
  312. return float(val_str)
  313. else:
  314. logger.error(f"[{device_name}] 获取数据失败 {payload['deviceItems']} {api_response.get('msg', '未知错误')}") # 日志 设备名称 请求负载 响应
  315. except requests.exceptions.RequestException as e:
  316. logger.error(f"[{device_name}] API网络错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误
  317. except (json.JSONDecodeError, ValueError, IndexError) as e:
  318. logger.error(f"[{device_name}] API数据解析错误 {payload['deviceItems']} {e}") # 日志 设备名称 请求负载 错误
  319. return None
  320. # 设备监控主循环
  321. def monitor_device(device):
  322. """
  323. 单个设备的监控循环
  324. 完整流程:
  325. 1. 等待触发条件
  326. 2. 收集稳定数据
  327. 3. 执行决策计算
  328. 4. 发送控制指令
  329. 5. 等待重置信号
  330. 参数:
  331. device: 设备配置字典
  332. """
  333. name = device["name"]
  334. threading.current_thread().name = name
  335. logger.info("监控线程启动")
  336. # 加载设备历史状态
  337. device_state = device_states.get(name, {}) # 设备状态
  338. model_prev_L_s = device_state.get('model_prev_L_s') # 过滤时间 上一轮
  339. model_prev_t_bw_s = device_state.get('model_prev_t_bw_s') # 反洗时间 上一轮
  340. last_cycle_end_time_str = device_state.get('last_cycle_end_time') # 上次运行结束时间
  341. # 解析上次运行结束时间
  342. last_cycle_end_time = None # 上次运行结束时间
  343. if last_cycle_end_time_str:
  344. try:
  345. last_cycle_end_time = datetime.strptime(last_cycle_end_time_str, DATETIME_FORMAT) # 上次运行结束时间
  346. logger.info(f"历史状态加载成功,上次运行时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
  347. except ValueError:
  348. logger.warning(f"时间戳解析失败 {last_cycle_end_time_str}")
  349. else:
  350. logger.info("首次运行,无历史状态")
  351. # 主循环
  352. while True:
  353. try:
  354. # 阶段1: 等待触发条件
  355. logger.info(f"等待触发 控制字需等于 {TRIGGER_VALUE}")
  356. while True:
  357. control_value = get_device_value(device["control_payload"], name) # 控制字
  358. if control_value is not None and int(control_value) == TRIGGER_VALUE: # 控制字 等于 触发值
  359. logger.info("触发条件满足")
  360. break
  361. time.sleep(POLL_INTERVAL)
  362. # 阶段2: 收集数据
  363. logger.info(f"开始收集TMP数据 需要 {NUM_VALUES_TO_COLLECT} 个有效数据点")
  364. collected_values = []
  365. last_known_value = get_device_value(device["target_payload"], name) # 上次已知值
  366. if last_known_value is not None:
  367. logger.info(f"TMP基准值 {last_known_value}")
  368. # 循环收集数据点,直到达到目标数量
  369. while len(collected_values) < NUM_VALUES_TO_COLLECT: # 收集数据点 直到达到目标数量
  370. current_value = get_device_value(device["target_payload"], name) # 当前值
  371. if current_value is None:
  372. time.sleep(POLL_INTERVAL)
  373. continue
  374. # 只有当数值发生变化时才记录
  375. if current_value != last_known_value: # 当前值 不等于 上次已知值
  376. collected_values.append(current_value)
  377. logger.info(f"TMP变化 {last_known_value:.4f} 到 {current_value:.4f} 已收集 {len(collected_values)}/{NUM_VALUES_TO_COLLECT}")
  378. last_known_value = current_value
  379. time.sleep(POLL_INTERVAL)
  380. else:
  381. logger.warning("无法获取TMP基准值,跳过本轮")
  382. continue
  383. # 阶段3: 决策计算
  384. logger.info("数据收集完成,开始决策计算")
  385. if collected_values:
  386. # 计算中位数作为代表值
  387. median_value = statistics.median(sorted(collected_values))
  388. logger.info(f"TMP中位数 {median_value:.4f}")
  389. # 确定历史数据查询时间范围
  390. current_decision_time = datetime.now()
  391. start_query_time = last_cycle_end_time if last_cycle_end_time else current_decision_time - timedelta(hours=48)
  392. _word_controldevice = device["control_payload"]["deviceItems"]
  393. # 查询历史极值
  394. max_tmp, min_tmp = get_tmp_extremes(device["press_pv_item"], start_query_time, current_decision_time, _word_controldevice)
  395. # 调用DQN模型获取决策建议
  396. logger.info("调用DQN决策模型")
  397. uf_bw_dict = run_uf_DQN_decide(uf_params, median_value)
  398. logger.info(f"模型决策结果 {uf_bw_dict}")
  399. # 获取当前PLC参数
  400. prod_time = get_device_value(device["production_time_payload"], name) or 3800 # 产水时间 默认3800
  401. bw_time = get_device_value(device["backwashing_payload"], name) or 100 # 反洗时间 默认100
  402. # 生成PLC指令
  403. L_s, t_bw_s = generate_plc_instructions(
  404. prod_time, bw_time, # 产水时间 反洗时间
  405. model_prev_L_s, model_prev_t_bw_s, # 过滤时间 反洗时间 上一轮
  406. uf_bw_dict["L_s"], uf_bw_dict["t_bw_s"] # 过滤时间 反洗时间 决策建议
  407. )
  408. # 计算运行指标
  409. logger.info(f"计算运行指标 TMP={median_value} L_s={L_s} t_bw_s={t_bw_s}")
  410. metrics = calc_uf_cycle_metrics(uf_params, median_value, max_tmp, min_tmp, L_s, t_bw_s) # 计算运行指标
  411. # 发送决策结果
  412. send_decision_to_callback(
  413. type_name=name, # 设备名称
  414. water_production_time=int(L_s), # 过滤时间
  415. physical_backwash=int(t_bw_s), # 反洗时间
  416. ceb_backwash_frequency=int(metrics["k_bw_per_ceb"]), # 化学反洗频率
  417. duration_system=int(prod_time), # 系统运行时间
  418. tmp_action=median_value, # TMP动作
  419. recovery_rate=metrics["recovery"], # 回收率
  420. ton_water_energy_kWh=metrics['ton_water_energy_kWh_per_m3'], # 吨水电耗
  421. max_permeability=metrics['max_permeability'], # 最高渗透率
  422. daily_prod_time_h=metrics['daily_prod_time_h'], # 日均产水时间
  423. ctime=current_decision_time.strftime(DATETIME_FORMAT) # 时间
  424. )
  425. # 判断是否下发PLC指令
  426. if get_current_config()['system']['use_model'] == 1:
  427. logger.info("模型开关已开启,下发PLC指令")
  428. send_plc_update(name, device["production_time_payload"]["deviceItems"], str(prod_time), str(int(L_s)), 1) # 过滤时间
  429. send_plc_update(name, device["backwashing_payload"]["deviceItems"], str(bw_time), str(int(t_bw_s)), 2) # 反洗时间
  430. else:
  431. logger.info("模型开关已关闭,跳过PLC指令")
  432. # 保存运行状态
  433. model_prev_L_s = L_s # 过滤时间 上一轮
  434. model_prev_t_bw_s = t_bw_s # 反洗时间 上一轮
  435. last_cycle_end_time = current_decision_time # 上次运行结束时间
  436. state_to_save = {
  437. 'model_prev_L_s': model_prev_L_s, # 过滤时间 上一轮
  438. 'model_prev_t_bw_s': model_prev_t_bw_s, # 反洗时间 上一轮
  439. 'last_cycle_end_time': last_cycle_end_time.strftime(DATETIME_FORMAT) # 上次运行结束时间
  440. }
  441. save_device_state(name, state_to_save) # 保存设备状态
  442. logger.info(f"状态保存完成 下次查询起始时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
  443. # 阶段4: 等待重置
  444. logger.info(f"等待重置 控制字需不等于 {TRIGGER_VALUE}")
  445. while True:
  446. control_value = get_device_value(device["control_payload"], name) # 控制字
  447. if control_value is None or int(control_value) != TRIGGER_VALUE: # 控制字 不等于 触发值
  448. logger.info("重置条件满足,开始新一轮")
  449. break
  450. time.sleep(POLL_INTERVAL)
  451. logger.info(f"{name} 本轮完成\n")
  452. except Exception as e:
  453. logger.critical(f"监控循环异常 {e}", exc_info=True)
  454. logger.info("等待60秒后重试")
  455. time.sleep(60)
  456. # 程序主入口
  457. def main():
  458. """
  459. 主函数
  460. 功能:
  461. 1. 加载设备历史状态
  462. 2. 为每个设备启动独立监控线程
  463. 3. 保持主线程运行
  464. """
  465. logger.info("========================================")
  466. logger.info("超滤并行监控服务启动")
  467. logger.info("========================================")
  468. # 加载设备历史状态
  469. load_device_states()
  470. # 为每个设备创建监控线程
  471. threads = []
  472. for device_config in DEVICE_SEQUENCE:
  473. thread = threading.Thread(target=monitor_device, args=(device_config,), daemon=True)
  474. threads.append(thread)
  475. thread.start()
  476. logger.info(f"设备 {device_config['name']} 监控线程已启动")
  477. # 保持主线程运行
  478. try:
  479. while any(t.is_alive() for t in threads):
  480. time.sleep(1)
  481. except KeyboardInterrupt:
  482. logger.info("检测到中断信号,程序退出")
  483. if __name__ == "__main__":
  484. main()