# -*- coding: utf-8 -*- """ RO膜污染监控与CIP预测 - 基于预测数据的最优时机分析 核心功能:分析RO膜压差预测数据,计算最优CIP清洗时机 CIP时机选择策略: 1. 最早时机策略:一段或二段任一需要CIP时即触发 2. 最晚时机策略:等待所有段都需要CIP时触发 3. 加权平均策略:综合两段污染程度,污染严重段权重更大 4. 污染严重程度策略:基于k值最大的段决策 使用方法: main(strategy=3) # 使用策略3 main(strategy=1, start_date='2025-08-26 00:00:00') # 指定策略和时间 """ import pandas as pd import numpy as np import logging from logging.handlers import RotatingFileHandler from sklearn.linear_model import LinearRegression from fouling_model_0922.predict import Predictor import warnings from datetime import datetime, timedelta from logging_system import CIPAnalysisLogger import json import requests import time import os warnings.filterwarnings('ignore', category=FutureWarning) # 日志系统配置 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # 日志输出格式 formatter = logging.Formatter( '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 文件日志处理器,单个文件最大5MB,保留3个备份 file_handler = RotatingFileHandler('main_simple.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8') file_handler.setFormatter(formatter) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) # 添加处理器 logger.addHandler(file_handler) logger.addHandler(console_handler) # 加载配置文件 def load_config(): """ 加载配置文件 Returns: dict: 配置字典,失败时返回None """ config_path = os.path.join(os.path.dirname(__file__), 'config.json') try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config except Exception as e: logger.error(f"配置文件加载失败: {e}") return None # 加载配置 config = load_config() def update_cip_history_in_config(result_df): """ 保存CIP预测结果到配置文件 功能:将预测的CIP时机写入config.json的cip_times字段的predicted_time Args: result_df: DataFrame,包含机组类型和CIP时机两列 Returns: bool: 保存成功返回True,失败返回False 注意: 此函数已废弃,smart_monitor会自动保存predicted_time 保留此函数仅为兼容性 """ global config if config is None: print("配置文件未加载") return False try: config_path = os.path.join(os.path.dirname(__file__), 'config.json') with open(config_path, 'r', encoding='utf-8') as f: current_config = json.load(f) # 遍历结果,写入配置 updated_units = [] for _, row in result_df.iterrows(): if pd.notna(row["CIP时机"]): unit_name = row["机组类型"] cip_time = row["CIP时机"].strftime('%Y-%m-%d %H:%M:%S') if unit_name in current_config.get('cip_times', {}): # 新格式:只更新predicted_time if isinstance(current_config['cip_times'][unit_name], dict): current_config['cip_times'][unit_name]['predicted_time'] = cip_time else: # 兼容旧格式:转换为新格式 current_config['cip_times'][unit_name] = { 'actual_time': current_config['cip_times'][unit_name], 'predicted_time': cip_time } updated_units.append(f"{unit_name}: {cip_time}") if updated_units: with open(config_path, 'w', encoding='utf-8') as f: json.dump(current_config, f, ensure_ascii=False, indent=2) config = current_config return True else: return False except Exception as e: logger.error(f"保存CIP预测时间失败: {e}") return False def validate_data(data, name="数据"): """ 验证时间序列数据格式 检查项: 1. 数据非空 2. 索引类型为DatetimeIndex Args: data: pd.Series或pd.DataFrame name: 数据名称,用于错误提示 Returns: bool: 验证通过返回True Raises: ValueError: 验证失败时抛出 """ if data is None or data.empty: raise ValueError(f"{name}为空或无效") if not isinstance(data.index, pd.DatetimeIndex): raise ValueError(f"{name}的索引必须是时间格式") return True class OptimalCIPPredictor: """ CIP最优时机预测器 工作原理: 1. 使用滑动窗口计算k值(膜污染速率) 2. 识别k值连续上升趋势 3. 在满足时间约束前提下,选择k值最大的时间点 参数说明: - window_days: 滑动窗口大小(天),用于线性回归计算k值 - min_continuous_rising: 最小连续上升点数,确保趋势稳定 - min_delay_days: 最小延迟天数,避免过早建议CIP """ def __init__(self, window_days=7, min_continuous_rising=3, min_delay_days=30): """ 初始化预测器 Args: window_days: 滑动窗口天数(默认7天) min_continuous_rising: 最小连续上升点数(默认3点) min_delay_days: 最小延迟天数(默认30天) """ self.window_days = window_days self.window_hours = window_days * 24 # 转换为小时 self.min_continuous_rising = min_continuous_rising self.min_delay_days = min_delay_days def calculate_sliding_k_values(self, pressure_series): """ 计算滑动窗口k值序列 基于机理模型: ΔP(t) = ΔP₀ + k×t 通过线性回归计算斜率k,表示膜污染速率 Args: pressure_series: pd.Series,压差时间序列,索引为时间 Returns: pd.Series: k值序列,前window_hours个值为NaN """ # 初始化k值序列,索引与输入数据保持一致 k_values = pd.Series(index=pressure_series.index, dtype=float) # 滑动窗口遍历,从第window_hours个点开始 for i in range(self.window_hours, len(pressure_series)): # 取当前窗口内的数据 window_data = pressure_series.iloc[i-self.window_hours:i] # 数据质量检查:窗口内至少80%的数据有效 if len(window_data) < self.window_hours * 0.8: continue # 构造时间点序列 [0, 1, 2, ..., window_hours-1] time_points = np.arange(len(window_data)).reshape(-1, 1) try: # 线性回归拟合:y = a + k*x,取斜率k作为污染速率 model = LinearRegression() model.fit(time_points, window_data.values) k = model.coef_[0] k_values.iloc[i] = k except: # 回归失败时跳过该点 continue return k_values def find_continuous_rising_periods(self, k_values): """ 识别k值连续上升的时间段 遍历k值序列,找出所有连续上升的区间 只保留持续时间大于等于min_continuous_rising的区间 Args: k_values: pd.Series,k值序列 Returns: list: 连续上升时间段列表,格式 [(start_idx, end_idx, duration), ...] """ rising_periods = [] start_idx = None # 当前上升段的起始索引 # 遍历k值序列,寻找连续上升段 for i in range(1, len(k_values)): # 跳过缺失值 if pd.isna(k_values.iloc[i]) or pd.isna(k_values.iloc[i-1]): start_idx = None continue # 判断k值是否上升 if k_values.iloc[i] > k_values.iloc[i-1]: # 开始新的上升段 if start_idx is None: start_idx = i-1 else: # k值不再上升,结束当前上升段 if start_idx is not None: duration = i - start_idx # 只保留持续时间足够长的上升段 if duration >= self.min_continuous_rising: rising_periods.append((start_idx, i-1, duration)) start_idx = None # 处理序列末尾的上升趋势 if start_idx is not None: duration = len(k_values) - start_idx # 持续时间 if duration >= self.min_continuous_rising: # 持续时间足够长 rising_periods.append((start_idx, len(k_values)-1, duration)) # 添加上升时间段 return rising_periods def find_optimal_cip_time(self, pressure_series): """ 最优CIP时机 核心步骤: 1. 计算滑动窗口k值(膜污染速率) 2. 识别k值连续上升的时间段 3. 应用时间约束(距离起点至少min_delay_days天) 4. 在有效时间段内选择k值最大的时间点 Args: pressure_series: pd.Series,压差时间序列 Returns: tuple: (optimal_time, analysis_result) - optimal_time: pd.Timestamp,最优CIP时间,失败时返回None - analysis_result: dict,分析结果详情 """ # 步骤1:计算滑动k值 k_values = self.calculate_sliding_k_values(pressure_series) valid_k_count = k_values.dropna().shape[0] # 检查:k值数量是否足够 if valid_k_count < 10: return None, {"error": "有效k值数量不足"} # 步骤2:识别连续上升时间段 rising_periods = self.find_continuous_rising_periods(k_values) if not rising_periods: return None, {"error": "未发现连续上升趋势"} # 步骤3:应用时间约束,筛选有效时间段 min_delay_time = pressure_series.index[0] + timedelta(days=self.min_delay_days) valid_periods = [] for start_idx, end_idx, duration in rising_periods: period_start_time = pressure_series.index[start_idx] period_end_time = pressure_series.index[end_idx] # 检查时间段是否在约束范围内 if period_end_time >= min_delay_time: if period_start_time < min_delay_time: # 时间段部分在约束范围内,截取有效部分 delay_idx = pressure_series.index.get_indexer([min_delay_time], method='nearest')[0] if delay_idx <= end_idx: valid_periods.append((delay_idx, end_idx, end_idx - delay_idx + 1)) else: # 时间段完全在约束范围内 valid_periods.append((start_idx, end_idx, duration)) if not valid_periods: return None, {"error": f"无满足时间约束的上升趋势(需>={self.min_delay_days}天后)"} # 步骤4:在有效时间段内寻找k值最大的点 best_time = None best_k = -np.inf for start_idx, end_idx, duration in valid_periods: period_k_values = k_values.iloc[start_idx:end_idx+1] max_k_idx = period_k_values.idxmax() # k值最大点的索引 max_k_value = period_k_values.max() # k值最大值 if max_k_value > best_k: best_k = max_k_value best_time = max_k_idx # 构建分析结果 analysis_result = { "success": True, "delay_days": (best_time - pressure_series.index[0]).days, "best_k": float(best_k) } return best_time, analysis_result def select_optimal_cip_strategy_1(cip_results): """ 策略1:最早时机策略 选择逻辑:取一段和二段中较早需要CIP的时机 适用场景:保守运维,及时维护 """ if not cip_results: return None, "无有效CIP时机" earliest_result = min(cip_results, key=lambda x: x['delay_days']) return earliest_result['time'], f"最早时机策略 - {earliest_result['column']} (第{earliest_result['delay_days']}天)" def select_optimal_cip_strategy_2(cip_results): """ 策略2:最晚时机策略 选择逻辑:取一段和二段中较晚需要CIP的时机 适用场景:最大化运行时间 """ if not cip_results: return None, "无有效CIP时机" latest_result = max(cip_results, key=lambda x: x['delay_days']) return latest_result['time'], f"最晚时机策略 - {latest_result['column']} (第{latest_result['delay_days']}天)" def select_optimal_cip_strategy_3(cip_results): """ 策略3:加权平均策略(推荐) 选择逻辑:根据k值对各段CIP时机加权,污染严重段权重更大 适用场景:平衡运行时间和维护需求 """ if not cip_results: return None, "无有效CIP时机" if len(cip_results) == 1: result = cip_results[0] return result['time'], f"单段加权策略 - {result['column']} (第{result['delay_days']}天)" # 计算加权平均天数 total_weight = sum(result['k_value'] for result in cip_results) weighted_days = sum(result['delay_days'] * result['k_value'] for result in cip_results) / total_weight # 找最接近加权平均天数的时机 target_days = int(round(weighted_days)) closest_result = min(cip_results, key=lambda x: abs(x['delay_days'] - target_days)) return closest_result['time'], f"加权平均策略 - {closest_result['column']} (目标第{target_days}天,实际第{closest_result['delay_days']}天)" def select_optimal_cip_strategy_4(cip_results): """ 策略4:污染严重程度策略 选择逻辑:选择k值最大(污染最严重)的段的CIP时机 适用场景:基于实际污染状况决策 """ if not cip_results: return None, "无有效CIP时机" max_k_result = max(cip_results, key=lambda x: x['k_value']) return max_k_result['time'], f"污染严重程度策略 - {max_k_result['column']} (k值={max_k_result['k_value']:.6f}, 第{max_k_result['delay_days']}天)" def select_optimal_cip_time(cip_results, strategy=1): """ 根据指定策略选择最优CIP时机 Args: cip_results: list,各段CIP分析结果列表,每个元素包含time、delay_days、k_value等字段 strategy: int,策略编号,1-4分别对应不同策略 Returns: tuple: (optimal_time, description) - optimal_time: pd.Timestamp,最优CIP时间 - description: str,策略描述 Raises: ValueError: 当策略编号无效时抛出 """ strategy_map = { 1: select_optimal_cip_strategy_1, # 最早时机 2: select_optimal_cip_strategy_2, # 最晚时机 3: select_optimal_cip_strategy_3, # 加权平均(推荐) 4: select_optimal_cip_strategy_4 # 污染严重程度 } if strategy not in strategy_map: raise ValueError(f"无效策略编号: {strategy},支持的策略: 1-4") return strategy_map[strategy](cip_results) def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None): """ 分析RO机组的最优CIP时间 功能: 1. 获取压差预测数据 2. 分析各机组各段的CIP时机 3. 根据策略选择最优CIP时间 Args: strategy: int,CIP时机选择策略(1-4) 1: 最早时机策略 2: 最晚时机策略 3: 加权平均策略(推荐) 4: 污染严重程度策略 start_date: str,预测起始时间,格式'YYYY-MM-DD HH:MM:SS',默认使用当前时间 unit_filter: str,指定分析的机组,如'RO1',默认分析所有机组 Returns: pd.DataFrame: 包含机组类型、CIP时机、策略说明的结果表 """ # 初始化日志记录器 analysis_logger = CIPAnalysisLogger() try: # 获取预测数据 try: all_data = Predictor().predict(start_date=start_date) # 获取预测数据 if all_data.empty: analysis_logger.logger.error("预测数据为空") return pd.DataFrame() except Exception as e: analysis_logger.logger.error(f"获取预测数据失败: {e}") return pd.DataFrame() # 将date列设置为索引 all_data = all_data.set_index('date') # 获取预测数据的起始时间 prediction_start_date = all_data.index[0].to_pydatetime() logger.info(f"预测起始: {prediction_start_date.strftime('%Y-%m-%d %H:%M:%S')}") # 记录输入参数和预测数据 analysis_logger.log_input_parameters(strategy, start_date, prediction_start_date) analysis_logger.log_prediction_data(all_data) # 确定要分析的机组 if unit_filter: unit_ids = [int(unit_filter.replace('RO', ''))] else: unit_ids = [1, 2, 3, 4] # 获取各机组的预测天数 from cip.run_this import main as get_unit_days unit_days_dict = {} for unit_id in unit_ids: unit_days_dict[unit_id] = get_unit_days(unit_id, prediction_start_date) # 记录机组预测天数 analysis_logger.log_unit_days(unit_days_dict) # 初始化预测器 predictor = OptimalCIPPredictor(window_days=7, min_continuous_rising=3, min_delay_days=30) # 存储分析结果 results = [] # 遍历分析各机组 for unit_id in unit_ids: # 获取该机组的预测天数 predict_days = unit_days_dict[unit_id] # 记录分析开始 analysis_logger.log_unit_analysis_start(unit_id, predict_days) # 截取预测天数范围内的数据 end_time = all_data.index[0] + timedelta(days=predict_days) truncated_data = all_data.loc[all_data.index <= end_time] # 筛选该机组的压差列 ro_name = f"RO{unit_id}" pressure_columns = [col for col in truncated_data.columns if ro_name in col and 'DPT' in col and 'pred' in col] if not pressure_columns: logger.warning(f"[{ro_name}] 未找到压差列") continue # 记录压差数据 analysis_logger.log_unit_pressure_data(unit_id, truncated_data, pressure_columns) # 收集各段的CIP分析结果 cip_results = [] for column in pressure_columns: pressure_series = truncated_data[column].dropna() pressure_series.name = column # 数据点数检查:至少需要30天数据 if len(pressure_series) < 30 * 24: continue try: # 寻找最优CIP时机 optimal_time, analysis = predictor.find_optimal_cip_time(pressure_series) # 记录分析结果 analysis_logger.log_cip_analysis_result(unit_id, column, optimal_time, analysis) if optimal_time: cip_results.append({ 'column': column, 'time': optimal_time, 'delay_days': analysis['delay_days'], 'k_value': analysis['best_k'] }) except Exception as e: analysis_logger.log_cip_analysis_result(unit_id, column, None, {"error": str(e)}) # 根据策略选择最优CIP时机 if cip_results: optimal_time, strategy_desc = select_optimal_cip_time(cip_results, strategy) results.append({ '机组类型': f"RO{unit_id}", 'CIP时机': optimal_time, '策略说明': strategy_desc }) logger.info(f"[RO{unit_id}] CIP时机 → {optimal_time.strftime('%Y-%m-%d %H:%M:%S')}") analysis_logger.log_unit_strategy_result(unit_id, optimal_time, strategy_desc) else: results.append({ '机组类型': f"RO{unit_id}", 'CIP时机': None, '策略说明': "无有效CIP时机" }) logger.warning(f"[RO{unit_id}] 未找到有效CIP时机") analysis_logger.log_unit_strategy_result(unit_id, None, "无有效CIP时机") # 生成结果DataFrame result_df = pd.DataFrame(results) # 记录最终结果 analysis_logger.log_final_results(result_df) # 生成分析图表 analysis_logger.create_analysis_plots(all_data, unit_days_dict) return result_df except Exception as e: analysis_logger.logger.error(f"分析过程中发生错误: {e}") raise finally: # 确保日志记录器正确关闭 analysis_logger.close() def main(strategy=3, start_date=None, unit_filter=None): """ 主执行函数 功能:执行RO机组CIP时机分析并发送结果到回调接口 Args: strategy: int,CIP时机选择策略(1-4),默认3(加权平均策略) 1: 最早时机策略 2: 最晚时机策略 3: 加权平均策略 4: 污染严重程度策略 start_date: str,预测起始时间,格式'YYYY-MM-DD HH:MM:SS',默认None(使用当前时间) unit_filter: str,指定预测的机组,如'RO1',默认None(预测所有机组) Returns: pd.DataFrame: 分析结果 示例: result_df = main() # 使用默认参数 result_df = main(start_date='2025-07-01 00:00:00') # 指定时间 result_df = main(strategy=1, unit_filter='RO1') # 指定策略和机组 """ # 执行分析 result_df = analyze_ro_unit_cip_timing(strategy=strategy, start_date=start_date, unit_filter=unit_filter) # 发送回调 if config and not result_df.empty: callback_success = send_decision_to_callback(result_df) if not callback_success: logger.error("决策结果发送失败") return result_df def send_decision_to_callback(decision_data): """ 将CIP决策结果发送到回调接口 功能:将分析结果按照API格式封装,通过HTTP POST发送到回调地址 Args: decision_data: pd.DataFrame,决策数据,包含机组类型和CIP时机 Returns: bool: 发送成功返回True,失败返回False """ if config is None: logger.error("配置文件未加载") return False try: # 构建回调URL callback_url = config['api']['base_url'] + config['api']['callback_endpoint'] # 设置请求头 headers = { "Content-Type": "application/json", "JWT-TOKEN": config['api']['jwt_token'] } # 获取项目ID project_id = config['scada']['project_id'] # 构造回调数据 callback_list = [] if isinstance(decision_data, pd.DataFrame): for _, row in decision_data.iterrows(): if pd.notna(row["CIP时机"]): callback_list.append({ "type": row["机组类型"], "project_id": project_id, "ctime": row["CIP时机"].strftime("%Y-%m-%d %H:%M:%S") }) else: callback_list = [decision_data] # 封装为API要求的格式 payload = { "list": callback_list } # 发送HTTP请求(带重试机制) max_retries = 3 retry_interval = 10 log_type = callback_list[0]["type"] if callback_list else "UNKNOWN" for attempt in range(1, max_retries + 1): try: response = requests.post(callback_url, headers=headers, json=payload, timeout=15) response.raise_for_status() logger.info(f"[{log_type}] 回调成功") return True except requests.exceptions.RequestException as e: if attempt < max_retries: time.sleep(retry_interval) else: logger.error(f"[{log_type}] 回调失败(重试{max_retries}次): {e}") return False except Exception as e: logger.error(f"构建回调数据时出错: {e}") return False if __name__ == '__main__': # 示例调用 # main() # 使用当前时间 main(start_date='2025-08-26 00:00:00', unit_filter='RO1') # 使用历史时间