main_simple.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741
  1. # -*- coding: utf-8 -*-
  2. """
  3. RO膜污染监控与CIP预测 - 基于预测数据的最优时机分析
  4. 核心功能:分析RO膜压差预测数据,计算最优CIP清洗时机
  5. CIP时机选择策略:
  6. 1. 最早时机策略:一段或二段任一需要CIP时即触发
  7. 2. 最晚时机策略:等待所有段都需要CIP时触发
  8. 3. 加权平均策略:综合两段污染程度,污染严重段权重更大
  9. 4. 污染严重程度策略:基于k值最大的段决策
  10. 使用方法:
  11. main(strategy=3) # 使用策略3
  12. main(strategy=1, start_date='2025-08-26 00:00:00') # 指定策略和时间
  13. """
  14. import pandas as pd
  15. import numpy as np
  16. from sklearn.linear_model import LinearRegression
  17. from fouling_model_0922.predict import Predictor
  18. import warnings
  19. from datetime import datetime, timedelta
  20. from logging_system import CIPAnalysisLogger
  21. import json
  22. import requests
  23. import time
  24. import os
  25. warnings.filterwarnings('ignore', category=FutureWarning)
  26. # 加载配置文件
  27. def load_config():
  28. """
  29. 加载配置文件
  30. Returns:
  31. dict: 配置字典,失败时返回None
  32. """
  33. config_path = os.path.join(os.path.dirname(__file__), 'config.json')
  34. try:
  35. with open(config_path, 'r', encoding='utf-8') as f:
  36. config = json.load(f)
  37. print(f"配置文件加载成功: {config_path}")
  38. return config
  39. except Exception as e:
  40. print(f"配置文件加载失败: {e}")
  41. return None
  42. # 加载配置
  43. config = load_config()
  44. def update_cip_history_in_config(result_df):
  45. """
  46. 保存CIP预测结果到配置文件
  47. 功能:将预测的CIP时机写入config.json的cip_times字段的predicted_time
  48. Args:
  49. result_df: DataFrame,包含机组类型和CIP时机两列
  50. Returns:
  51. bool: 保存成功返回True,失败返回False
  52. 注意:
  53. 此函数已废弃,smart_monitor会自动保存predicted_time
  54. 保留此函数仅为兼容性
  55. """
  56. global config
  57. if config is None:
  58. print("配置文件未加载")
  59. return False
  60. try:
  61. config_path = os.path.join(os.path.dirname(__file__), 'config.json')
  62. with open(config_path, 'r', encoding='utf-8') as f:
  63. current_config = json.load(f)
  64. # 遍历结果,写入配置
  65. updated_units = []
  66. for _, row in result_df.iterrows():
  67. if pd.notna(row["CIP时机"]):
  68. unit_name = row["机组类型"]
  69. cip_time = row["CIP时机"].strftime('%Y-%m-%d %H:%M:%S')
  70. if unit_name in current_config.get('cip_times', {}):
  71. # 新格式:只更新predicted_time
  72. if isinstance(current_config['cip_times'][unit_name], dict):
  73. current_config['cip_times'][unit_name]['predicted_time'] = cip_time
  74. else:
  75. # 兼容旧格式:转换为新格式
  76. current_config['cip_times'][unit_name] = {
  77. 'actual_time': current_config['cip_times'][unit_name],
  78. 'predicted_time': cip_time
  79. }
  80. updated_units.append(f"{unit_name}: {cip_time}")
  81. if updated_units:
  82. with open(config_path, 'w', encoding='utf-8') as f:
  83. json.dump(current_config, f, ensure_ascii=False, indent=2)
  84. print(f"CIP预测时间已保存:")
  85. for update in updated_units:
  86. print(f" {update}")
  87. config = current_config
  88. return True
  89. else:
  90. print("无CIP时间需要保存")
  91. return False
  92. except Exception as e:
  93. print(f"保存CIP预测时间失败: {e}")
  94. return False
  95. def validate_data(data, name="数据"):
  96. """
  97. 验证时间序列数据格式
  98. 检查项:
  99. 1. 数据非空
  100. 2. 索引类型为DatetimeIndex
  101. Args:
  102. data: pd.Series或pd.DataFrame
  103. name: 数据名称,用于错误提示
  104. Returns:
  105. bool: 验证通过返回True
  106. Raises:
  107. ValueError: 验证失败时抛出
  108. """
  109. if data is None or data.empty:
  110. raise ValueError(f"{name}为空或无效")
  111. if not isinstance(data.index, pd.DatetimeIndex):
  112. raise ValueError(f"{name}的索引必须是时间格式")
  113. return True
  114. class OptimalCIPPredictor:
  115. """
  116. CIP最优时机预测器
  117. 工作原理:
  118. 1. 使用滑动窗口计算k值(膜污染速率)
  119. 2. 识别k值连续上升趋势
  120. 3. 在满足时间约束前提下,选择k值最大的时间点
  121. 参数说明:
  122. - window_days: 滑动窗口大小(天),用于线性回归计算k值
  123. - min_continuous_rising: 最小连续上升点数,确保趋势稳定
  124. - min_delay_days: 最小延迟天数,避免过早建议CIP
  125. """
  126. def __init__(self, window_days=7, min_continuous_rising=3, min_delay_days=30):
  127. """
  128. 初始化预测器
  129. Args:
  130. window_days: 滑动窗口天数(默认7天)
  131. min_continuous_rising: 最小连续上升点数(默认3点)
  132. min_delay_days: 最小延迟天数(默认30天)
  133. """
  134. self.window_days = window_days
  135. self.window_hours = window_days * 24 # 转换为小时
  136. self.min_continuous_rising = min_continuous_rising
  137. self.min_delay_days = min_delay_days
  138. print(f"预测器初始化: 窗口={window_days}天, 连续上升>={min_continuous_rising}点, 延迟>={min_delay_days}天")
  139. def calculate_sliding_k_values(self, pressure_series):
  140. """
  141. 计算滑动窗口k值序列
  142. 基于机理模型: ΔP(t) = ΔP₀ + k×t
  143. 通过线性回归计算斜率k,表示膜污染速率
  144. Args:
  145. pressure_series: pd.Series,压差时间序列,索引为时间
  146. Returns:
  147. pd.Series: k值序列,前window_hours个值为NaN
  148. """
  149. # 初始化k值序列,索引与输入数据保持一致
  150. k_values = pd.Series(index=pressure_series.index, dtype=float)
  151. # 滑动窗口遍历,从第window_hours个点开始
  152. for i in range(self.window_hours, len(pressure_series)):
  153. # 取当前窗口内的数据
  154. window_data = pressure_series.iloc[i-self.window_hours:i]
  155. # 数据质量检查:窗口内至少80%的数据有效
  156. if len(window_data) < self.window_hours * 0.8:
  157. continue
  158. # 构造时间点序列 [0, 1, 2, ..., window_hours-1]
  159. time_points = np.arange(len(window_data)).reshape(-1, 1)
  160. try:
  161. # 线性回归拟合:y = a + k*x,取斜率k作为污染速率
  162. model = LinearRegression()
  163. model.fit(time_points, window_data.values)
  164. k = model.coef_[0]
  165. k_values.iloc[i] = k
  166. except:
  167. # 回归失败时跳过该点
  168. continue
  169. return k_values
  170. def find_continuous_rising_periods(self, k_values):
  171. """
  172. 识别k值连续上升的时间段
  173. 遍历k值序列,找出所有连续上升的区间
  174. 只保留持续时间大于等于min_continuous_rising的区间
  175. Args:
  176. k_values: pd.Series,k值序列
  177. Returns:
  178. list: 连续上升时间段列表,格式 [(start_idx, end_idx, duration), ...]
  179. """
  180. rising_periods = []
  181. start_idx = None # 当前上升段的起始索引
  182. # 遍历k值序列,寻找连续上升段
  183. for i in range(1, len(k_values)):
  184. # 跳过缺失值
  185. if pd.isna(k_values.iloc[i]) or pd.isna(k_values.iloc[i-1]):
  186. start_idx = None
  187. continue
  188. # 判断k值是否上升
  189. if k_values.iloc[i] > k_values.iloc[i-1]:
  190. # 开始新的上升段
  191. if start_idx is None:
  192. start_idx = i-1
  193. else:
  194. # k值不再上升,结束当前上升段
  195. if start_idx is not None:
  196. duration = i - start_idx
  197. # 只保留持续时间足够长的上升段
  198. if duration >= self.min_continuous_rising:
  199. rising_periods.append((start_idx, i-1, duration))
  200. start_idx = None
  201. # 处理序列末尾的上升趋势
  202. if start_idx is not None:
  203. duration = len(k_values) - start_idx # 持续时间
  204. if duration >= self.min_continuous_rising: # 持续时间足够长
  205. rising_periods.append((start_idx, len(k_values)-1, duration)) # 添加上升时间段
  206. return rising_periods
  207. def find_optimal_cip_time(self, pressure_series):
  208. """
  209. 最优CIP时机
  210. 核心步骤:
  211. 1. 计算滑动窗口k值(膜污染速率)
  212. 2. 识别k值连续上升的时间段
  213. 3. 应用时间约束(距离起点至少min_delay_days天)
  214. 4. 在有效时间段内选择k值最大的时间点
  215. Args:
  216. pressure_series: pd.Series,压差时间序列
  217. Returns:
  218. tuple: (optimal_time, analysis_result)
  219. - optimal_time: pd.Timestamp,最优CIP时间,失败时返回None
  220. - analysis_result: dict,分析结果详情
  221. """
  222. # 步骤1:计算滑动k值
  223. k_values = self.calculate_sliding_k_values(pressure_series)
  224. valid_k_count = k_values.dropna().shape[0]
  225. # 检查:k值数量是否足够
  226. if valid_k_count < 10:
  227. return None, {"error": "有效k值数量不足"}
  228. # 步骤2:识别连续上升时间段
  229. rising_periods = self.find_continuous_rising_periods(k_values)
  230. if not rising_periods:
  231. return None, {"error": "未发现连续上升趋势"}
  232. # 步骤3:应用时间约束,筛选有效时间段
  233. min_delay_time = pressure_series.index[0] + timedelta(days=self.min_delay_days)
  234. valid_periods = []
  235. for start_idx, end_idx, duration in rising_periods:
  236. period_start_time = pressure_series.index[start_idx]
  237. period_end_time = pressure_series.index[end_idx]
  238. # 检查时间段是否在约束范围内
  239. if period_end_time >= min_delay_time:
  240. if period_start_time < min_delay_time:
  241. # 时间段部分在约束范围内,截取有效部分
  242. delay_idx = pressure_series.index.get_indexer([min_delay_time], method='nearest')[0]
  243. if delay_idx <= end_idx:
  244. valid_periods.append((delay_idx, end_idx, end_idx - delay_idx + 1))
  245. else:
  246. # 时间段完全在约束范围内
  247. valid_periods.append((start_idx, end_idx, duration))
  248. if not valid_periods:
  249. return None, {"error": f"无满足时间约束的上升趋势(需>={self.min_delay_days}天后)"}
  250. # 步骤4:在有效时间段内寻找k值最大的点
  251. best_time = None
  252. best_k = -np.inf
  253. for start_idx, end_idx, duration in valid_periods:
  254. period_k_values = k_values.iloc[start_idx:end_idx+1]
  255. max_k_idx = period_k_values.idxmax() # k值最大点的索引
  256. max_k_value = period_k_values.max() # k值最大值
  257. if max_k_value > best_k:
  258. best_k = max_k_value
  259. best_time = max_k_idx
  260. # 构建分析结果
  261. analysis_result = {
  262. "success": True,
  263. "delay_days": (best_time - pressure_series.index[0]).days,
  264. "best_k": float(best_k)
  265. }
  266. return best_time, analysis_result
  267. def select_optimal_cip_strategy_1(cip_results):
  268. """
  269. 策略1:最早时机策略
  270. 选择逻辑:取一段和二段中较早需要CIP的时机
  271. 适用场景:保守运维,及时维护
  272. """
  273. if not cip_results:
  274. return None, "无有效CIP时机"
  275. earliest_result = min(cip_results, key=lambda x: x['delay_days'])
  276. return earliest_result['time'], f"最早时机策略 - {earliest_result['column']} (第{earliest_result['delay_days']}天)"
  277. def select_optimal_cip_strategy_2(cip_results):
  278. """
  279. 策略2:最晚时机策略
  280. 选择逻辑:取一段和二段中较晚需要CIP的时机
  281. 适用场景:最大化运行时间
  282. """
  283. if not cip_results:
  284. return None, "无有效CIP时机"
  285. latest_result = max(cip_results, key=lambda x: x['delay_days'])
  286. return latest_result['time'], f"最晚时机策略 - {latest_result['column']} (第{latest_result['delay_days']}天)"
  287. def select_optimal_cip_strategy_3(cip_results):
  288. """
  289. 策略3:加权平均策略(推荐)
  290. 选择逻辑:根据k值对各段CIP时机加权,污染严重段权重更大
  291. 适用场景:平衡运行时间和维护需求
  292. """
  293. if not cip_results:
  294. return None, "无有效CIP时机"
  295. if len(cip_results) == 1:
  296. result = cip_results[0]
  297. return result['time'], f"单段加权策略 - {result['column']} (第{result['delay_days']}天)"
  298. # 计算加权平均天数
  299. total_weight = sum(result['k_value'] for result in cip_results)
  300. weighted_days = sum(result['delay_days'] * result['k_value'] for result in cip_results) / total_weight
  301. # 找最接近加权平均天数的时机
  302. target_days = int(round(weighted_days))
  303. closest_result = min(cip_results, key=lambda x: abs(x['delay_days'] - target_days))
  304. return closest_result['time'], f"加权平均策略 - {closest_result['column']} (目标第{target_days}天,实际第{closest_result['delay_days']}天)"
  305. def select_optimal_cip_strategy_4(cip_results):
  306. """
  307. 策略4:污染严重程度策略
  308. 选择逻辑:选择k值最大(污染最严重)的段的CIP时机
  309. 适用场景:基于实际污染状况决策
  310. """
  311. if not cip_results:
  312. return None, "无有效CIP时机"
  313. max_k_result = max(cip_results, key=lambda x: x['k_value'])
  314. return max_k_result['time'], f"污染严重程度策略 - {max_k_result['column']} (k值={max_k_result['k_value']:.6f}, 第{max_k_result['delay_days']}天)"
  315. def select_optimal_cip_time(cip_results, strategy=1):
  316. """
  317. 根据指定策略选择最优CIP时机
  318. Args:
  319. cip_results: list,各段CIP分析结果列表,每个元素包含time、delay_days、k_value等字段
  320. strategy: int,策略编号,1-4分别对应不同策略
  321. Returns:
  322. tuple: (optimal_time, description)
  323. - optimal_time: pd.Timestamp,最优CIP时间
  324. - description: str,策略描述
  325. Raises:
  326. ValueError: 当策略编号无效时抛出
  327. """
  328. strategy_map = {
  329. 1: select_optimal_cip_strategy_1, # 最早时机
  330. 2: select_optimal_cip_strategy_2, # 最晚时机
  331. 3: select_optimal_cip_strategy_3, # 加权平均(推荐)
  332. 4: select_optimal_cip_strategy_4 # 污染严重程度
  333. }
  334. if strategy not in strategy_map:
  335. raise ValueError(f"无效策略编号: {strategy},支持的策略: 1-4")
  336. return strategy_map[strategy](cip_results)
  337. def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
  338. """
  339. 分析RO机组的最优CIP时间
  340. 功能:
  341. 1. 获取压差预测数据
  342. 2. 分析各机组各段的CIP时机
  343. 3. 根据策略选择最优CIP时间
  344. Args:
  345. strategy: int,CIP时机选择策略(1-4)
  346. 1: 最早时机策略
  347. 2: 最晚时机策略
  348. 3: 加权平均策略(推荐)
  349. 4: 污染严重程度策略
  350. start_date: str,预测起始时间,格式'YYYY-MM-DD HH:MM:SS',默认使用当前时间
  351. unit_filter: str,指定分析的机组,如'RO1',默认分析所有机组
  352. Returns:
  353. pd.DataFrame: 包含机组类型、CIP时机、策略说明的结果表
  354. """
  355. # 初始化日志记录器
  356. logger = CIPAnalysisLogger()
  357. try:
  358. # 获取预测数据
  359. print("获取预测数据...")
  360. try:
  361. all_data = Predictor().predict(start_date=start_date) # 获取预测数据
  362. if all_data.empty:
  363. logger.logger.error("预测数据为空")
  364. return pd.DataFrame()
  365. except Exception as e:
  366. logger.logger.error(f"获取预测数据失败: {e}")
  367. return pd.DataFrame()
  368. # 将date列设置为索引
  369. all_data = all_data.set_index('date')
  370. # 获取预测数据的起始时间
  371. prediction_start_date = all_data.index[0].to_pydatetime()
  372. print(f"预测数据起始时间: {prediction_start_date.strftime('%Y-%m-%d %H:%M:%S')}")
  373. # 记录输入参数和预测数据
  374. logger.log_input_parameters(strategy, start_date, prediction_start_date)
  375. logger.log_prediction_data(all_data)
  376. # 显示配置的CIP时间状态
  377. if config and 'cip_times' in config:
  378. print(f"\n当前配置的CIP时间状态:")
  379. for unit_name, cip_data in config['cip_times'].items():
  380. if not unit_name.startswith('_'):
  381. if isinstance(cip_data, dict):
  382. actual = cip_data.get('actual_time', 'N/A')
  383. predicted = cip_data.get('predicted_time', 'N/A')
  384. print(f" {unit_name}: 实际={actual}, 预测={predicted}")
  385. else:
  386. print(f" {unit_name}: {cip_data}")
  387. # 确定要分析的机组
  388. if unit_filter:
  389. unit_ids = [int(unit_filter.replace('RO', ''))]
  390. else:
  391. unit_ids = [1, 2, 3, 4]
  392. # 获取各机组的预测天数
  393. from cip.run_this import main as get_unit_days
  394. unit_days_dict = {}
  395. for unit_id in unit_ids:
  396. unit_days_dict[unit_id] = get_unit_days(unit_id, prediction_start_date)
  397. # 记录机组预测天数
  398. logger.log_unit_days(unit_days_dict)
  399. # 初始化预测器
  400. predictor = OptimalCIPPredictor(window_days=7, min_continuous_rising=3, min_delay_days=30)
  401. # 策略说明
  402. strategy_names = {
  403. 1: "最早时机策略",
  404. 2: "最晚时机策略",
  405. 3: "加权平均策略",
  406. 4: "污染严重程度策略"
  407. }
  408. print(f"\n使用策略: {strategy_names.get(strategy, '未知策略')}")
  409. # 存储分析结果
  410. results = []
  411. # 遍历分析各机组
  412. for unit_id in unit_ids:
  413. print(f"\n分析机组 RO{unit_id}")
  414. # 获取该机组的预测天数
  415. predict_days = unit_days_dict[unit_id]
  416. # 记录分析开始
  417. logger.log_unit_analysis_start(unit_id, predict_days)
  418. # 截取预测天数范围内的数据
  419. end_time = all_data.index[0] + timedelta(days=predict_days)
  420. truncated_data = all_data.loc[all_data.index <= end_time]
  421. # 筛选该机组的压差列
  422. ro_name = f"RO{unit_id}"
  423. pressure_columns = [col for col in truncated_data.columns if ro_name in col and 'DPT' in col and 'pred' in col]
  424. if not pressure_columns:
  425. print(f"警告: 未找到{ro_name}的压差列")
  426. continue
  427. print(f"找到{ro_name}压差列: {len(pressure_columns)}个")
  428. # 记录压差数据
  429. logger.log_unit_pressure_data(unit_id, truncated_data, pressure_columns)
  430. # 收集各段的CIP分析结果
  431. cip_results = []
  432. for column in pressure_columns:
  433. pressure_series = truncated_data[column].dropna()
  434. pressure_series.name = column
  435. # 数据点数检查:至少需要30天数据
  436. if len(pressure_series) < 30 * 24:
  437. continue
  438. try:
  439. # 寻找最优CIP时机
  440. optimal_time, analysis = predictor.find_optimal_cip_time(pressure_series)
  441. # 记录分析结果
  442. logger.log_cip_analysis_result(unit_id, column, optimal_time, analysis)
  443. if optimal_time:
  444. cip_results.append({
  445. 'column': column,
  446. 'time': optimal_time,
  447. 'delay_days': analysis['delay_days'],
  448. 'k_value': analysis['best_k']
  449. })
  450. print(f" {column}: {optimal_time} (第{analysis['delay_days']}天, k={analysis['best_k']:.6f})")
  451. else:
  452. print(f" {column}: 未找到CIP时机 - {analysis.get('error', '未知原因')}")
  453. except Exception as e:
  454. print(f" {column}: 分析失败 - {str(e)}")
  455. logger.log_cip_analysis_result(unit_id, column, None, {"error": str(e)})
  456. # 根据策略选择最优CIP时机
  457. if cip_results:
  458. optimal_time, strategy_desc = select_optimal_cip_time(cip_results, strategy)
  459. results.append({
  460. '机组类型': f"RO{unit_id}",
  461. 'CIP时机': optimal_time,
  462. '策略说明': strategy_desc
  463. })
  464. print(f"RO{unit_id}最优CIP时机: {optimal_time}")
  465. print(f"策略: {strategy_desc}")
  466. logger.log_unit_strategy_result(unit_id, optimal_time, strategy_desc)
  467. else:
  468. results.append({
  469. '机组类型': f"RO{unit_id}",
  470. 'CIP时机': None,
  471. '策略说明': "无有效CIP时机"
  472. })
  473. print(f"RO{unit_id}: 未找到有效CIP时机")
  474. logger.log_unit_strategy_result(unit_id, None, "无有效CIP时机")
  475. # 生成结果DataFrame
  476. result_df = pd.DataFrame(results)
  477. # 记录最终结果
  478. logger.log_final_results(result_df)
  479. # 生成分析图表
  480. logger.create_analysis_plots(all_data, unit_days_dict)
  481. print("\n" + "="*50)
  482. print("分析完成")
  483. print("="*50)
  484. print(result_df.to_string(index=False))
  485. return result_df
  486. except Exception as e:
  487. logger.logger.error(f"分析过程中发生错误: {e}")
  488. raise
  489. finally:
  490. # 确保日志记录器正确关闭
  491. logger.close()
  492. def main(strategy=3, start_date=None, unit_filter=None):
  493. """
  494. 主执行函数
  495. 功能:执行RO机组CIP时机分析并发送结果到回调接口
  496. Args:
  497. strategy: int,CIP时机选择策略(1-4),默认3(加权平均策略)
  498. 1: 最早时机策略
  499. 2: 最晚时机策略
  500. 3: 加权平均策略
  501. 4: 污染严重程度策略
  502. start_date: str,预测起始时间,格式'YYYY-MM-DD HH:MM:SS',默认None(使用当前时间)
  503. unit_filter: str,指定预测的机组,如'RO1',默认None(预测所有机组)
  504. Returns:
  505. pd.DataFrame: 分析结果
  506. 示例:
  507. result_df = main() # 使用默认参数
  508. result_df = main(start_date='2025-07-01 00:00:00') # 指定时间
  509. result_df = main(strategy=1, unit_filter='RO1') # 指定策略和机组
  510. """
  511. print("RO膜污染监控与CIP预测")
  512. print("=" * 50)
  513. # 执行分析
  514. result_df = analyze_ro_unit_cip_timing(strategy=strategy, start_date=start_date, unit_filter=unit_filter)
  515. # 发送回调
  516. if config and not result_df.empty:
  517. print("\n发送决策结果...")
  518. callback_success = send_decision_to_callback(result_df)
  519. if callback_success:
  520. print("决策结果发送成功")
  521. else:
  522. print("决策结果发送失败")
  523. return result_df
  524. def send_decision_to_callback(decision_data):
  525. """
  526. 将CIP决策结果发送到回调接口
  527. 功能:将分析结果按照API格式封装,通过HTTP POST发送到回调地址
  528. Args:
  529. decision_data: pd.DataFrame,决策数据,包含机组类型和CIP时机
  530. Returns:
  531. bool: 发送成功返回True,失败返回False
  532. """
  533. if config is None:
  534. print("配置文件未加载")
  535. return False
  536. try:
  537. # 构建回调URL
  538. callback_url = config['api']['base_url'] + config['api']['callback_endpoint']
  539. # 设置请求头
  540. headers = {
  541. "Content-Type": "application/json",
  542. "JWT-TOKEN": config['api']['jwt_token']
  543. }
  544. # 获取项目ID
  545. project_id = config['scada']['project_id']
  546. # 构造回调数据
  547. callback_list = []
  548. if isinstance(decision_data, pd.DataFrame):
  549. for _, row in decision_data.iterrows():
  550. if pd.notna(row["CIP时机"]):
  551. callback_list.append({
  552. "type": row["机组类型"],
  553. "project_id": project_id,
  554. "ctime": row["CIP时机"].strftime("%Y-%m-%d %H:%M:%S")
  555. })
  556. else:
  557. callback_list = [decision_data]
  558. # 封装为API要求的格式
  559. payload = {
  560. "list": callback_list
  561. }
  562. # 日志:显示待发送数据
  563. log_type = callback_list[0]["type"] if callback_list else "UNKNOWN"
  564. print(f"[{log_type}] 准备发送决策数据:")
  565. print(f"{json.dumps(payload, indent=2, ensure_ascii=False)}")
  566. # 发送HTTP请求(带重试机制)
  567. max_retries = 3
  568. retry_interval = 10
  569. for attempt in range(1, max_retries + 1):
  570. try:
  571. print(f"[{log_type}] 第{attempt}/{max_retries}次尝试发送...")
  572. response = requests.post(callback_url, headers=headers, json=payload, timeout=15)
  573. response.raise_for_status()
  574. print(f"[{log_type}] 决策数据发送成功,服务器响应: {response.text}")
  575. return True
  576. except requests.exceptions.RequestException as e:
  577. print(f"[{log_type}] 发送失败: {e}")
  578. if attempt < max_retries:
  579. print(f"[{log_type}] {retry_interval}秒后重试...")
  580. time.sleep(retry_interval)
  581. print(f"[{log_type}] 所有重试均失败")
  582. return False
  583. except Exception as e:
  584. print(f"构建回调数据时出错: {e}")
  585. return False
  586. if __name__ == '__main__':
  587. # 示例调用
  588. # main() # 使用当前时间
  589. main(start_date='2025-08-26 00:00:00', unit_filter='RO1') # 使用历史时间