main_simple.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828
  1. # -*- coding: utf-8 -*-
  2. """
  3. RO膜污染监控与CIP预测 - 基于预测数据的最优时机分析
  4. 核心功能:分析RO膜压差预测数据,计算最优CIP清洗时机
  5. CIP输出模式:
  6. 1. 分段输出模式(separate_stages=True,默认):
  7. - 分别输出每个段的独立CIP时机
  8. - 例如:RO1-一段: 2025-10-15 10:00:00, RO1-二段: 2025-10-20 15:00:00
  9. 2. 合并输出模式(separate_stages=False,保留以备后用):
  10. - 使用策略合并输出一个CIP时机
  11. - 策略1: 最早时机策略 - 一段或二段任一需要CIP时即触发
  12. - 策略2: 最晚时机策略 - 等待所有段都需要CIP时触发
  13. - 策略3: 加权平均策略 - 综合两段污染程度,污染严重段权重更大
  14. - 策略4: 污染严重程度策略 - 基于k值最大的段决策
  15. 使用方法:
  16. main() # 使用默认参数(分段输出模式)
  17. main(separate_stages=False, strategy=3) # 使用合并模式+策略3
  18. main(start_date='2025-08-26 00:00:00') # 指定起始时间
  19. main(unit_filter='RO1') # 只分析RO1
  20. """
  21. import pandas as pd
  22. import numpy as np
  23. from sklearn.linear_model import LinearRegression
  24. from fouling_model_0922.predict import Predictor
  25. from get_api_data import get_sensor_data
  26. import warnings
  27. from datetime import datetime, timedelta
  28. from logging_system import CIPAnalysisLogger
  29. import json
  30. import requests
  31. import time
  32. import os
  33. warnings.filterwarnings('ignore', category=FutureWarning)
  34. # 加载配置文件
  35. def load_config():
  36. """
  37. 加载配置文件
  38. Returns:
  39. dict: 配置字典,失败时返回None
  40. """
  41. config_path = os.path.join(os.path.dirname(__file__), 'config.json')
  42. try:
  43. with open(config_path, 'r', encoding='utf-8') as f:
  44. config = json.load(f)
  45. print(f"配置文件加载成功: {config_path}")
  46. return config
  47. except Exception as e:
  48. print(f"配置文件加载失败: {e}")
  49. return None
  50. # 加载配置
  51. config = load_config()
  52. # 设置请求头
  53. headers = {
  54. "Content-Type": "application/json",
  55. "JWT-TOKEN": config['api']['jwt_token']
  56. }
  57. # 构建回调URL
  58. callback_url = config['api']['base_url'] + config['api']['callback_endpoint']
  59. history_url = config['api']['API_History_URL']
  60. def update_cip_history_in_config(result_df):
  61. """
  62. 保存CIP预测结果到配置文件
  63. 功能:将预测的CIP时机写入config.json的cip_times字段的predicted_time
  64. Args:
  65. result_df: DataFrame,包含机组类型和CIP时机两列
  66. Returns:
  67. bool: 保存成功返回True,失败返回False
  68. 注意:
  69. 此函数已废弃,smart_monitor会自动保存predicted_time
  70. 保留此函数仅为兼容性
  71. """
  72. global config
  73. if config is None:
  74. print("配置文件未加载")
  75. return False
  76. try:
  77. config_path = os.path.join(os.path.dirname(__file__), 'config.json')
  78. with open(config_path, 'r', encoding='utf-8') as f:
  79. current_config = json.load(f)
  80. # 遍历结果,写入配置
  81. updated_units = []
  82. for _, row in result_df.iterrows():
  83. if pd.notna(row["CIP时机"]):
  84. unit_name = row["机组类型"]
  85. cip_time = row["CIP时机"].strftime('%Y-%m-%d %H:%M:%S')
  86. if unit_name in current_config.get('cip_times', {}):
  87. # 新格式:只更新predicted_time
  88. if isinstance(current_config['cip_times'][unit_name], dict):
  89. current_config['cip_times'][unit_name]['predicted_time'] = cip_time
  90. else:
  91. # 兼容旧格式:转换为新格式
  92. current_config['cip_times'][unit_name] = {
  93. 'actual_time': current_config['cip_times'][unit_name],
  94. 'predicted_time': cip_time
  95. }
  96. updated_units.append(f"{unit_name}: {cip_time}")
  97. if updated_units:
  98. with open(config_path, 'w', encoding='utf-8') as f:
  99. json.dump(current_config, f, ensure_ascii=False, indent=2)
  100. print(f"CIP预测时间已保存:")
  101. for update in updated_units:
  102. print(f" {update}")
  103. config = current_config
  104. return True
  105. else:
  106. print("无CIP时间需要保存")
  107. return False
  108. except Exception as e:
  109. print(f"保存CIP预测时间失败: {e}")
  110. return False
  111. def validate_data(data, name="数据"):
  112. """
  113. 验证时间序列数据格式
  114. 检查项:
  115. 1. 数据非空
  116. 2. 索引类型为DatetimeIndex
  117. Args:
  118. data: pd.Series或pd.DataFrame
  119. name: 数据名称,用于错误提示
  120. Returns:
  121. bool: 验证通过返回True
  122. Raises:
  123. ValueError: 验证失败时抛出
  124. """
  125. if data is None or data.empty:
  126. raise ValueError(f"{name}为空或无效")
  127. if not isinstance(data.index, pd.DatetimeIndex):
  128. raise ValueError(f"{name}的索引必须是时间格式")
  129. return True
  130. class OptimalCIPPredictor:
  131. """
  132. CIP最优时机预测器
  133. 工作原理:
  134. 1. 使用滑动窗口计算k值(膜污染速率)
  135. 2. 识别k值连续上升趋势
  136. 3. 在满足时间约束前提下,选择k值最大的时间点
  137. 参数说明:
  138. - window_days: 滑动窗口大小(天),用于线性回归计算k值
  139. - min_continuous_rising: 最小连续上升点数,确保趋势稳定
  140. - min_delay_days: 最小延迟天数,避免过早建议CIP
  141. """
  142. def __init__(self, window_days=7, min_continuous_rising=3, min_delay_days=30):
  143. """
  144. 初始化预测器
  145. Args:
  146. window_days: 滑动窗口天数(默认7天)
  147. min_continuous_rising: 最小连续上升点数(默认3点)
  148. min_delay_days: 最小延迟天数(默认30天)
  149. """
  150. self.window_days = window_days
  151. self.window_hours = window_days * 24 # 转换为小时
  152. self.min_continuous_rising = min_continuous_rising
  153. self.min_delay_days = min_delay_days
  154. print(f"预测器初始化: 窗口={window_days}天, 连续上升>={min_continuous_rising}点, 延迟>={min_delay_days}天")
  155. def calculate_sliding_k_values(self, pressure_series):
  156. """
  157. 计算滑动窗口k值序列
  158. 基于机理模型: ΔP(t) = ΔP₀ + k×t
  159. 通过线性回归计算斜率k,表示膜污染速率
  160. Args:
  161. pressure_series: pd.Series,压差时间序列,索引为时间
  162. Returns:
  163. pd.Series: k值序列,前window_hours个值为NaN
  164. """
  165. # 初始化k值序列,索引与输入数据保持一致
  166. k_values = pd.Series(index=pressure_series.index, dtype=float)
  167. # 滑动窗口遍历,从第window_hours个点开始
  168. for i in range(self.window_hours, len(pressure_series)):
  169. # 取当前窗口内的数据
  170. window_data = pressure_series.iloc[i-self.window_hours:i]
  171. # 数据质量检查:窗口内至少80%的数据有效
  172. if len(window_data) < self.window_hours * 0.8:
  173. continue
  174. # 构造时间点序列 [0, 1, 2, ..., window_hours-1]
  175. time_points = np.arange(len(window_data)).reshape(-1, 1)
  176. try:
  177. # 线性回归拟合:y = a + k*x,取斜率k作为污染速率
  178. model = LinearRegression()
  179. model.fit(time_points, window_data.values)
  180. k = model.coef_[0]
  181. k_values.iloc[i] = k
  182. except:
  183. # 回归失败时跳过该点
  184. continue
  185. return k_values
  186. def find_continuous_rising_periods(self, k_values):
  187. """
  188. 识别k值连续上升的时间段
  189. 遍历k值序列,找出所有连续上升的区间
  190. 只保留持续时间大于等于min_continuous_rising的区间
  191. Args:
  192. k_values: pd.Series,k值序列
  193. Returns:
  194. list: 连续上升时间段列表,格式 [(start_idx, end_idx, duration), ...]
  195. """
  196. rising_periods = []
  197. start_idx = None # 当前上升段的起始索引
  198. # 遍历k值序列,寻找连续上升段
  199. for i in range(1, len(k_values)):
  200. # 跳过缺失值
  201. if pd.isna(k_values.iloc[i]) or pd.isna(k_values.iloc[i-1]):
  202. start_idx = None
  203. continue
  204. # 判断k值是否上升
  205. if k_values.iloc[i] > k_values.iloc[i-1]:
  206. # 开始新的上升段
  207. if start_idx is None:
  208. start_idx = i-1
  209. else:
  210. # k值不再上升,结束当前上升段
  211. if start_idx is not None:
  212. duration = i - start_idx
  213. # 只保留持续时间足够长的上升段
  214. if duration >= self.min_continuous_rising:
  215. rising_periods.append((start_idx, i-1, duration))
  216. start_idx = None
  217. # 处理序列末尾的上升趋势
  218. if start_idx is not None:
  219. duration = len(k_values) - start_idx # 持续时间
  220. if duration >= self.min_continuous_rising: # 持续时间足够长
  221. rising_periods.append((start_idx, len(k_values)-1, duration)) # 添加上升时间段
  222. return rising_periods
  223. def find_optimal_cip_time(self, pressure_series):
  224. """
  225. 最优CIP时机
  226. 核心步骤:
  227. 1. 计算滑动窗口k值(膜污染速率)
  228. 2. 识别k值连续上升的时间段
  229. 3. 应用时间约束(距离起点至少min_delay_days天)
  230. 4. 在有效时间段内选择k值最大的时间点
  231. Args:
  232. pressure_series: pd.Series,压差时间序列
  233. Returns:
  234. tuple: (optimal_time, analysis_result)
  235. - optimal_time: pd.Timestamp,最优CIP时间,失败时返回None
  236. - analysis_result: dict,分析结果详情
  237. """
  238. # 步骤1:计算滑动k值
  239. k_values = self.calculate_sliding_k_values(pressure_series)
  240. valid_k_count = k_values.dropna().shape[0]
  241. # 检查:k值数量是否足够
  242. if valid_k_count < 10:
  243. return None, {
  244. "error": "有效k值数量不足",
  245. "valid_k_count": valid_k_count,
  246. "required": 10
  247. }
  248. # 步骤2:识别连续上升时间段
  249. rising_periods = self.find_continuous_rising_periods(k_values)
  250. if not rising_periods:
  251. return None, {
  252. "error": "未发现连续上升趋势",
  253. "valid_k_count": valid_k_count,
  254. "min_continuous_rising": self.min_continuous_rising,
  255. "hint": "k值没有持续上升趋势,可能膜污染较稳定"
  256. }
  257. # 步骤3:应用时间约束,筛选有效时间段
  258. min_delay_time = pressure_series.index[0] + timedelta(days=self.min_delay_days)
  259. valid_periods = []
  260. for start_idx, end_idx, duration in rising_periods:
  261. period_start_time = pressure_series.index[start_idx]
  262. period_end_time = pressure_series.index[end_idx]
  263. # 检查时间段是否在约束范围内
  264. if period_end_time >= min_delay_time:
  265. if period_start_time < min_delay_time:
  266. # 时间段部分在约束范围内,截取有效部分
  267. delay_idx = pressure_series.index.get_indexer([min_delay_time], method='nearest')[0]
  268. if delay_idx <= end_idx:
  269. valid_periods.append((delay_idx, end_idx, end_idx - delay_idx + 1))
  270. else:
  271. # 时间段完全在约束范围内
  272. valid_periods.append((start_idx, end_idx, duration))
  273. if not valid_periods:
  274. return None, {
  275. "error": f"无满足时间约束的上升趋势(需>={self.min_delay_days}天后)",
  276. "rising_periods_count": len(rising_periods),
  277. "min_delay_days": self.min_delay_days,
  278. "data_days": (pressure_series.index[-1] - pressure_series.index[0]).days,
  279. "hint": f"发现{len(rising_periods)}个上升趋势,但都在前{self.min_delay_days}天内"
  280. }
  281. # 步骤4:在有效时间段内寻找k值最大的点
  282. best_time = None
  283. best_k = -np.inf
  284. for start_idx, end_idx, duration in valid_periods:
  285. period_k_values = k_values.iloc[start_idx:end_idx+1]
  286. max_k_idx = period_k_values.idxmax() # k值最大点的索引
  287. max_k_value = period_k_values.max() # k值最大值
  288. if max_k_value > best_k:
  289. best_k = max_k_value
  290. best_time = max_k_idx
  291. # 构建分析结果
  292. analysis_result = {
  293. "success": True,
  294. "delay_days": (best_time - pressure_series.index[0]).days,
  295. "best_k": float(best_k),
  296. "valid_periods_count": len(valid_periods)
  297. }
  298. return best_time, analysis_result
  299. def select_optimal_cip_strategy_1(cip_results):
  300. """
  301. 策略1:最早时机策略
  302. 选择逻辑:取一段和二段中较早需要CIP的时机
  303. 适用场景:保守运维,及时维护
  304. """
  305. if not cip_results:
  306. return None, "无有效CIP时机"
  307. earliest_result = min(cip_results, key=lambda x: x['delay_days'])
  308. return earliest_result['time'], f"最早时机策略 - {earliest_result['column']} (第{earliest_result['delay_days']}天)"
  309. def select_optimal_cip_strategy_2(cip_results):
  310. """
  311. 策略2:最晚时机策略
  312. 选择逻辑:取一段和二段中较晚需要CIP的时机
  313. 适用场景:最大化运行时间
  314. """
  315. if not cip_results:
  316. return None, "无有效CIP时机"
  317. latest_result = max(cip_results, key=lambda x: x['delay_days'])
  318. return latest_result['time'], f"最晚时机策略 - {latest_result['column']} (第{latest_result['delay_days']}天)"
  319. def select_optimal_cip_strategy_3(cip_results):
  320. """
  321. 策略3:加权平均策略(推荐)
  322. 选择逻辑:根据k值对各段CIP时机加权,污染严重段权重更大
  323. 适用场景:平衡运行时间和维护需求
  324. """
  325. if not cip_results:
  326. return None, "无有效CIP时机"
  327. if len(cip_results) == 1:
  328. result = cip_results[0]
  329. return result['time'], f"单段加权策略 - {result['column']} (第{result['delay_days']}天)"
  330. # 计算加权平均天数
  331. total_weight = sum(result['k_value'] for result in cip_results)
  332. weighted_days = sum(result['delay_days'] * result['k_value'] for result in cip_results) / total_weight
  333. # 找最接近加权平均天数的时机
  334. target_days = int(round(weighted_days))
  335. closest_result = min(cip_results, key=lambda x: abs(x['delay_days'] - target_days))
  336. return closest_result['time'], f"加权平均策略 - {closest_result['column']} (目标第{target_days}天,实际第{closest_result['delay_days']}天)"
  337. def select_optimal_cip_strategy_4(cip_results):
  338. """
  339. 策略4:污染严重程度策略
  340. 选择逻辑:选择k值最大(污染最严重)的段的CIP时机
  341. 适用场景:基于实际污染状况决策
  342. """
  343. if not cip_results:
  344. return None, "无有效CIP时机"
  345. max_k_result = max(cip_results, key=lambda x: x['k_value'])
  346. return max_k_result['time'], f"污染严重程度策略 - {max_k_result['column']} (k值={max_k_result['k_value']:.6f}, 第{max_k_result['delay_days']}天)"
  347. def extract_stage_name(column_name):
  348. """
  349. 从列名中提取段号信息
  350. Args:
  351. column_name: str,列名,例如'C.M.RO1_DB@DPT_1_pred'或'C.M.RO2_DB@DPT_2_pred'
  352. Returns:
  353. str: 段号名称,例如'一段'或'二段'
  354. """
  355. if 'DPT_1' in column_name:
  356. return '一段'
  357. elif 'DPT_2' in column_name:
  358. return '二段'
  359. else:
  360. return '未知段'
  361. def select_optimal_cip_time(cip_results, strategy=1):
  362. """
  363. 根据指定策略选择最优CIP时机
  364. Args:
  365. cip_results: list,各段CIP分析结果列表,每个元素包含time、delay_days、k_value等字段
  366. strategy: int,策略编号,1-4分别对应不同策略
  367. Returns:
  368. tuple: (optimal_time, description)
  369. - optimal_time: pd.Timestamp,最优CIP时间
  370. - description: str,策略描述
  371. Raises:
  372. ValueError: 当策略编号无效时抛出
  373. """
  374. strategy_map = {
  375. 1: select_optimal_cip_strategy_1, # 最早时机
  376. 2: select_optimal_cip_strategy_2, # 最晚时机
  377. 3: select_optimal_cip_strategy_3, # 加权平均(推荐)
  378. 4: select_optimal_cip_strategy_4 # 污染严重程度
  379. }
  380. if strategy not in strategy_map:
  381. raise ValueError(f"无效策略编号: {strategy},支持的策略: 1-4")
  382. return strategy_map[strategy](cip_results)
  383. def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None, separate_stages=True):
  384. """
  385. 分析RO机组的最优CIP时间
  386. 功能:
  387. 1. 获取压差预测数据
  388. 2. 分析各机组各段的CIP时机
  389. 3. 根据策略选择最优CIP时间
  390. Args:
  391. strategy: int,CIP时机选择策略(1-4)
  392. 1: 最早时机策略
  393. 2: 最晚时机策略
  394. 3: 加权平均策略(推荐)
  395. 4: 污染严重程度策略
  396. start_date: str,预测起始时间,格式'YYYY-MM-DD HH:MM:SS',默认使用当前时间
  397. unit_filter: str,指定分析的机组,如'RO1',默认分析所有机组
  398. separate_stages: bool,是否分段输出一段和二段的独立CIP时机
  399. True: 分别输出每个段的CIP时机(例如:RO1-一段、RO1-二段)
  400. False: 使用策略合并输出一个CIP时机(例如:RO1)
  401. Returns:
  402. pd.DataFrame: 包含机组类型、CIP时机、策略说明的结果表
  403. """
  404. # 初始化日志记录器(传入unit_filter用于目录命名)
  405. logger = CIPAnalysisLogger(unit_filter=unit_filter)
  406. try:
  407. # 获取预测数据
  408. try:
  409. # 获取 前180天的数据
  410. df = get_sensor_data(start_date, history_url, headers)
  411. #预测
  412. all_data = Predictor().predict(df =df)
  413. if all_data.empty:
  414. logger.logger.error("预测数据为空")
  415. return pd.DataFrame()
  416. except Exception as e:
  417. logger.logger.error(f"获取预测数据失败: {e}")
  418. return pd.DataFrame()
  419. # 将date列设置为索引
  420. all_data = all_data.set_index('date')
  421. # 获取预测数据的起始时间
  422. prediction_start_date = all_data.index[0].to_pydatetime()
  423. print(f"预测起始: {prediction_start_date.strftime('%Y-%m-%d %H:%M:%S')}")
  424. # 记录输入参数和预测数据
  425. logger.log_input_parameters(strategy, start_date, prediction_start_date)
  426. logger.log_prediction_data(all_data)
  427. # 确定要分析的机组
  428. if unit_filter:
  429. unit_ids = [int(unit_filter.replace('RO', ''))]
  430. else:
  431. unit_ids = [1, 2, 3, 4]
  432. # 获取各机组的预测天数
  433. from cip.run_this import main as get_unit_days
  434. unit_days_dict = {}
  435. for unit_id in unit_ids:
  436. unit_days_dict[unit_id] = get_unit_days(unit_id, prediction_start_date)
  437. # 记录机组预测天数
  438. logger.log_unit_days(unit_days_dict)
  439. # 初始化预测器
  440. predictor = OptimalCIPPredictor(window_days=7, min_continuous_rising=3, min_delay_days=30)
  441. # 存储分析结果
  442. results = []
  443. # 遍历分析各机组
  444. for unit_id in unit_ids:
  445. # 获取该机组的预测天数
  446. predict_days = unit_days_dict[unit_id]
  447. print(f"\n[RO{unit_id}] 预测天数: {predict_days}天")
  448. # 记录分析开始
  449. logger.log_unit_analysis_start(unit_id, predict_days)
  450. # 截取预测天数范围内的数据
  451. end_time = all_data.index[0] + timedelta(days=predict_days)
  452. truncated_data = all_data.loc[all_data.index <= end_time]
  453. # 筛选该机组的压差列
  454. ro_name = f"RO{unit_id}"
  455. pressure_columns = [col for col in truncated_data.columns if ro_name in col and 'DPT' in col and 'pred' in col]
  456. if not pressure_columns:
  457. print(f"[RO{unit_id}] 警告: 未找到压差列")
  458. continue
  459. # 记录压差数据
  460. logger.log_unit_pressure_data(unit_id, truncated_data, pressure_columns)
  461. # 收集各段的CIP分析结果
  462. cip_results = []
  463. for column in pressure_columns:
  464. pressure_series = truncated_data[column].dropna()
  465. pressure_series.name = column
  466. # 数据点数检查:至少需要30天数据
  467. data_days = len(pressure_series) / 24
  468. print(f" {column}: 数据点数={len(pressure_series)}, 约{data_days:.1f}天")
  469. if len(pressure_series) < 30 * 24:
  470. print(f" [跳过] 数据不足30天")
  471. logger.log_cip_analysis_result(unit_id, column, None,
  472. {"error": f"数据不足: {len(pressure_series)}点 < 720点(30天)"})
  473. continue
  474. try:
  475. # 寻找最优CIP时机
  476. optimal_time, analysis = predictor.find_optimal_cip_time(pressure_series)
  477. # 记录分析结果(带详细诊断信息)
  478. if optimal_time:
  479. print(f" [成功] 找到CIP时机: {optimal_time.strftime('%Y-%m-%d %H:%M')}")
  480. else:
  481. print(f" [失败] 未找到CIP时机: {analysis.get('error', '未知原因')}")
  482. logger.log_cip_analysis_result(unit_id, column, optimal_time, analysis)
  483. if optimal_time:
  484. cip_results.append({
  485. 'column': column,
  486. 'time': optimal_time,
  487. 'delay_days': analysis['delay_days'],
  488. 'k_value': analysis['best_k']
  489. })
  490. except Exception as e:
  491. print(f" [异常] 分析失败: {str(e)}")
  492. logger.log_cip_analysis_result(unit_id, column, None, {"error": str(e)})
  493. # 根据separate_stages参数决定输出方式
  494. if cip_results:
  495. if separate_stages:
  496. # 方案1:分段输出每个段的独立CIP时机
  497. print(f"\n[RO{unit_id}] 分段输出模式:")
  498. for cip_result in cip_results:
  499. stage_name = extract_stage_name(cip_result['column'])
  500. unit_stage_name = f"RO{unit_id}-{stage_name}"
  501. stage_desc = f"独立分析 - {cip_result['column']} (k值={cip_result['k_value']:.6f}, 第{cip_result['delay_days']}天)"
  502. results.append({
  503. '机组类型': unit_stage_name,
  504. 'CIP时机': cip_result['time'],
  505. '策略说明': stage_desc
  506. })
  507. print(f" {unit_stage_name}: {cip_result['time'].strftime('%Y-%m-%d %H:%M:%S')}")
  508. # 记录日志
  509. logger.log_unit_strategy_result(unit_id, cip_result['time'], f"{stage_name} - {stage_desc}")
  510. else:
  511. # 原有逻辑:使用策略合并输出一个CIP时机(保留以备后用)
  512. optimal_time, strategy_desc = select_optimal_cip_time(cip_results, strategy)
  513. results.append({
  514. '机组类型': f"RO{unit_id}",
  515. 'CIP时机': optimal_time,
  516. '策略说明': strategy_desc
  517. })
  518. print(f"RO{unit_id} CIP时机: {optimal_time.strftime('%Y-%m-%d %H:%M:%S')}")
  519. logger.log_unit_strategy_result(unit_id, optimal_time, strategy_desc)
  520. else:
  521. # 如果没找到最优CIP时机,使用预测天数的最后时间作为CIP时机
  522. fallback_time = end_time
  523. fallback_desc = f"使用预测终点时间 (第{predict_days}天)"
  524. if separate_stages:
  525. # 分段模式:为每个段都输出备用时机
  526. print(f"\n[RO{unit_id}] 无有效CIP时机,使用备用策略")
  527. for stage_num in ['一段', '二段']:
  528. results.append({
  529. '机组类型': f"RO{unit_id}-{stage_num}",
  530. 'CIP时机': fallback_time,
  531. '策略说明': fallback_desc
  532. })
  533. print(f" RO{unit_id}-{stage_num}: {fallback_time.strftime('%Y-%m-%d %H:%M:%S')} (备用策略)")
  534. else:
  535. # 合并模式:输出一个备用时机
  536. results.append({
  537. '机组类型': f"RO{unit_id}",
  538. 'CIP时机': fallback_time,
  539. '策略说明': fallback_desc
  540. })
  541. print(f"RO{unit_id} CIP时机: {fallback_time.strftime('%Y-%m-%d %H:%M:%S')} (备用策略)")
  542. logger.log_unit_strategy_result(unit_id, fallback_time, fallback_desc)
  543. # 生成结果DataFrame
  544. result_df = pd.DataFrame(results)
  545. # 记录最终结果
  546. logger.log_final_results(result_df)
  547. # 生成分析图表
  548. logger.create_analysis_plots(all_data, unit_days_dict)
  549. print("\n" + "="*50)
  550. for _, row in result_df.iterrows():
  551. if pd.notna(row['CIP时机']):
  552. print(f"{row['机组类型']}: {row['CIP时机'].strftime('%Y-%m-%d %H:%M:%S')}")
  553. print("="*50)
  554. return result_df
  555. except Exception as e:
  556. logger.logger.error(f"分析过程中发生错误: {e}")
  557. raise
  558. finally:
  559. # 确保日志记录器正确关闭
  560. logger.close()
  561. def main(strategy=3, start_date=None, unit_filter=None, separate_stages=True, send_callback=True):
  562. """
  563. 主执行函数
  564. 功能:执行RO机组CIP时机分析并发送结果到回调接口
  565. Args:
  566. strategy: int,CIP时机选择策略(1-4),默认3(加权平均策略)
  567. 1: 最早时机策略
  568. 2: 最晚时机策略
  569. 3: 加权平均策略
  570. 4: 污染严重程度策略
  571. start_date: str,预测起始时间,格式'YYYY-MM-DD HH:MM:SS',默认None(使用当前时间)
  572. unit_filter: str,指定预测的机组,如'RO1',默认None(预测所有机组)
  573. separate_stages: bool,是否分段输出一段和二段的独立CIP时机,默认True
  574. True: 分别输出每个段的CIP时机(例如:RO1-一段、RO1-二段)
  575. False: 使用策略合并输出一个CIP时机(例如:RO1)
  576. send_callback: bool,是否发送回调,默认True
  577. 当从 smart_monitor 调用时应设为 False,避免重复发送
  578. Returns:
  579. pd.DataFrame: 分析结果
  580. 示例:
  581. result_df = main() # 使用默认参数(分段输出)
  582. result_df = main(separate_stages=False) # 使用策略合并
  583. result_df = main(start_date='2025-07-01 00:00:00') # 指定时间
  584. result_df = main(strategy=1, unit_filter='RO1') # 指定策略和机组
  585. """
  586. # 执行分析
  587. result_df = analyze_ro_unit_cip_timing(
  588. strategy=strategy,
  589. start_date=start_date,
  590. unit_filter=unit_filter,
  591. separate_stages=separate_stages
  592. )
  593. # 发送回调(如果启用)
  594. if send_callback and config and not result_df.empty:
  595. callback_success = send_decision_to_callback(result_df)
  596. if not callback_success:
  597. print(" 回调发送失败")
  598. return result_df
  599. def send_decision_to_callback(decision_data):
  600. """
  601. 将CIP决策结果发送到回调接口
  602. 功能:将分析结果按照API格式封装,通过HTTP POST发送到回调地址
  603. Args:
  604. decision_data: pd.DataFrame,决策数据,包含机组类型和CIP时机
  605. Returns:
  606. bool: 发送成功返回True,失败返回False
  607. """
  608. if config is None:
  609. print("配置文件未加载")
  610. return False
  611. try:
  612. # 获取项目ID
  613. project_id = config['scada']['project_id']
  614. # 构造回调数据
  615. callback_list = []
  616. if isinstance(decision_data, pd.DataFrame):
  617. for _, row in decision_data.iterrows():
  618. if pd.notna(row["CIP时机"]):
  619. # 从机组类型中提取段号信息
  620. unit_type = row["机组类型"]
  621. # 判断是一段还是二段
  622. if "一段" in unit_type:
  623. stage_num = 1
  624. elif "二段" in unit_type:
  625. stage_num = 2
  626. else:
  627. stage_num = 1 # 默认为1段(兼容旧格式)
  628. # 提取纯粹的机组编号(去掉"-一段"或"-二段")
  629. # 例如:"RO1-一段" -> "RO1","RO2-二段" -> "RO2"
  630. unit_name = unit_type.split('-')[0] if '-' in unit_type else unit_type
  631. callback_list.append({
  632. "type": unit_name,
  633. "project_id": project_id,
  634. "ctime": row["CIP时机"].strftime("%Y-%m-%d %H:%M:%S"),
  635. "ceb_backwash_frequency": stage_num
  636. })
  637. else:
  638. callback_list = [decision_data]
  639. # 关键检查:如果没有有效数据,不发送回调
  640. if not callback_list:
  641. return False
  642. # 封装为API要求的格式
  643. payload = {
  644. "list": callback_list
  645. }
  646. # 发送HTTP请求(带重试机制)
  647. max_retries = 3
  648. retry_interval = 10
  649. for attempt in range(1, max_retries + 1):
  650. try:
  651. response = requests.post(callback_url, headers=headers, json=payload, timeout=15)
  652. response.raise_for_status()
  653. print(f"回调发送成功")
  654. return True
  655. except requests.exceptions.RequestException as e:
  656. if attempt < max_retries:
  657. time.sleep(retry_interval)
  658. else:
  659. print(f"回调发送失败: {e}")
  660. return False
  661. except Exception as e:
  662. print(f"构建回调数据时出错: {e}")
  663. return False
  664. if __name__ == '__main__':
  665. # 示例调用
  666. # 方式1:分段输出模式(默认,推荐)
  667. # 分别输出RO1-一段、RO1-二段的CIP时机
  668. main(start_date='2025-10-26 00:00:00', unit_filter='RO1')
  669. # 方式2:合并输出模式(使用策略合并,保留以备后用)
  670. # main(start_date='2025-08-26 00:00:00', unit_filter='RO1', separate_stages=False, strategy=3)