# -*- coding: utf-8 -*- """ RO膜污染监控与CIP预测系统 - 日志记录模块 功能: 1. 记录每次分析的输入参数 2. 保存预测数据和分析结果 3. 生成可视化图表 4. 导出HTML格式分析报告 """ import os import json import logging import pandas as pd import numpy as np from datetime import datetime, timedelta import matplotlib.pyplot as plt import seaborn as sns from pathlib import Path # 设置中文字体 plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False class CIPAnalysisLogger: """ CIP分析日志记录器 功能: - 记录分析会话信息 - 保存输入输出数据 - 生成分析图表和报告 """ def __init__(self, log_dir="analysis_logs", unit_filter=None): """ 初始化日志记录器 Args: log_dir: str,日志目录路径,默认"analysis_logs" unit_filter: str,机组过滤器,如'RO1',用于目录命名 目录结构(优化后): analysis_logs/ CIP_RO1_20251105_155542/ # 类别_机组_时间 CIP_Analysis_20251105_155542.log data/ plots/ reports/ CIP_ALL_20251105_160000/ # 全机组分析 """ # 生成会话ID和时间戳 self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.session_id = f"CIP_Analysis_{self.timestamp}" # 创建按类别和时间分组的会话目录 base_log_dir = Path(log_dir) base_log_dir.mkdir(exist_ok=True) # 目录命名:CIP_机组_时间 unit_name = unit_filter if unit_filter else "ALL" dir_name = f"CIP_{unit_name}_{self.timestamp}" self.log_dir = base_log_dir / dir_name self.log_dir.mkdir(exist_ok=True) # 创建子目录结构(放在会话目录下) self.data_dir = self.log_dir / "data" self.plots_dir = self.log_dir / "plots" self.reports_dir = self.log_dir / "reports" for dir_path in [self.data_dir, self.plots_dir, self.reports_dir]: dir_path.mkdir(exist_ok=True) # 初始化日志 self.setup_logging() # 初始化数据存储 self.analysis_data = { "session_info": { "session_id": self.session_id, "start_time": datetime.now().isoformat(), "analysis_type": "CIP时机预测", "system_version": "v4_wuhan" }, "input_parameters": {}, "prediction_data": {}, "unit_analysis": {}, "final_results": {}, "performance_metrics": {} } self.logger.info(f"CIP分析会话: {self.session_id}, 目录: {self.log_dir}") def setup_logging(self): """设置日志配置""" log_file = self.log_dir / f"{self.session_id}.log" # 创建logger self.logger = logging.getLogger(self.session_id) self.logger.setLevel(logging.INFO) # 禁止日志传播到父logger,避免重复输出 self.logger.propagate = False # 清除已有的所有处理器(防止重复添加) if self.logger.handlers: for handler in self.logger.handlers[:]: handler.close() self.logger.removeHandler(handler) # 创建文件处理器 file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.INFO) # 创建控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 创建格式器 formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) # 添加处理器 self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) def log_input_parameters(self, strategy, start_date, prediction_start_date=None): """ 记录输入参数 Args: strategy: int,策略编号 start_date: str,起始时间 prediction_start_date: datetime,预测起始时间 """ params = { "strategy": strategy, "start_date": start_date, "prediction_start_date": prediction_start_date.isoformat() if prediction_start_date else None, "analysis_timestamp": datetime.now().isoformat() } self.analysis_data["input_parameters"] = params pred_start = f", 预测起始: {prediction_start_date}" if prediction_start_date else "" self.logger.info(f"参数 - 策略: {strategy}, 起始: {start_date}{pred_start}") def log_prediction_data(self, all_data): """ 记录预测数据概况 Args: all_data: pd.DataFrame,预测数据 """ try: data_info = { "shape": list(all_data.shape), "time_range": { "start": all_data.index.min().isoformat(), "end": all_data.index.max().isoformat() }, "columns": list(all_data.columns), "data_points": len(all_data), "missing_values": all_data.isnull().sum().to_dict() } self.analysis_data["prediction_data"] = data_info # 保存数据到文件 data_file = self.data_dir / f"{self.session_id}_prediction_data.csv" all_data.to_csv(data_file) self.logger.info(f"预测数据 - 形状: {data_info['shape']}, 点数: {data_info['data_points']}, 已保存: {data_file.name}") except Exception as e: self.logger.error(f"记录预测数据失败: {e}") def log_unit_days(self, unit_days_dict): """ 记录各机组预测天数 Args: unit_days_dict: dict,机组ID到预测天数的映射 """ self.analysis_data["unit_days"] = unit_days_dict days_str = ", ".join([f"RO{uid}:{days}天" for uid, days in unit_days_dict.items()]) self.logger.info(f"预测天数 - {days_str}") def log_unit_analysis_start(self, unit_id, predict_days): """ 记录机组分析开始 Args: unit_id: int,机组ID predict_days: int,预测天数 """ self.logger.info(f"[RO{unit_id}] 开始分析, 预测周期: {predict_days}天") if unit_id not in self.analysis_data["unit_analysis"]: self.analysis_data["unit_analysis"][unit_id] = {} self.analysis_data["unit_analysis"][unit_id]["predict_days"] = predict_days self.analysis_data["unit_analysis"][unit_id]["analysis_start"] = datetime.now().isoformat() def log_unit_pressure_data(self, unit_id, truncated_data, pressure_columns): """ 记录机组压差数据 Args: unit_id: int,机组ID truncated_data: pd.DataFrame,截取后的数据 pressure_columns: list,压差列名列表 """ try: unit_data = { "pressure_columns": pressure_columns, "data_shape": list(truncated_data.shape), "data_range": { "start": truncated_data.index.min().isoformat(), "end": truncated_data.index.max().isoformat() } } self.analysis_data["unit_analysis"][unit_id]["pressure_data"] = unit_data cols_str = ", ".join(pressure_columns) self.logger.info(f"[RO{unit_id}] 压差列: {len(pressure_columns)}个 ({cols_str})") # 保存数据文件 unit_data_file = self.data_dir / f"{self.session_id}_RO{unit_id}_pressure_data.csv" truncated_data[pressure_columns].to_csv(unit_data_file) except Exception as e: self.logger.error(f"记录RO{unit_id}压差数据失败: {e}") def log_cip_analysis_result(self, unit_id, column, optimal_time, analysis): """ 记录CIP分析结果(增强版,包含诊断信息) Args: unit_id: int,机组ID column: str,压差列名 optimal_time: pd.Timestamp,最优CIP时间 analysis: dict,分析结果(可能包含诊断信息) """ try: if "cip_results" not in self.analysis_data["unit_analysis"][unit_id]: self.analysis_data["unit_analysis"][unit_id]["cip_results"] = [] result = { "column": column, "optimal_time": optimal_time.isoformat() if optimal_time else None, "delay_days": analysis.get("delay_days"), "k_value": analysis.get("best_k"), "analysis_details": analysis } self.analysis_data["unit_analysis"][unit_id]["cip_results"].append(result) if optimal_time: self.logger.info(f" {column}: {optimal_time} (第{analysis['delay_days']}天, k={analysis['best_k']:.6f})") else: error_msg = analysis.get('error', '未知原因') self.logger.info(f" {column}: 未找到CIP时机 - {error_msg}") # 记录详细诊断信息 if 'hint' in analysis: self.logger.info(f" 提示: {analysis['hint']}") if 'valid_k_count' in analysis: self.logger.info(f" 有效k值数量: {analysis['valid_k_count']}") if 'rising_periods_count' in analysis: self.logger.info(f" 上升趋势段数: {analysis['rising_periods_count']}") if 'data_days' in analysis: self.logger.info(f" 数据覆盖天数: {analysis['data_days']}天") except Exception as e: self.logger.error(f"记录CIP分析结果失败: {e}") def log_unit_strategy_result(self, unit_id, optimal_time, strategy_desc): """ 记录机组策略选择结果 Args: unit_id: int,机组ID optimal_time: pd.Timestamp,最优CIP时间 strategy_desc: str,策略描述 """ try: strategy_result = { "optimal_time": optimal_time.isoformat() if optimal_time else None, "strategy_description": strategy_desc, "selection_timestamp": datetime.now().isoformat() } self.analysis_data["unit_analysis"][unit_id]["strategy_result"] = strategy_result self.logger.info(f"[RO{unit_id}] 最优CIP时机: {optimal_time}, 策略: {strategy_desc}") except Exception as e: self.logger.error(f"记录RO{unit_id}策略结果失败: {e}") def log_final_results(self, result_df): """ 记录最终分析结果 Args: result_df: pd.DataFrame,最终结果表 """ try: # 转换为可序列化格式 final_results = [] for _, row in result_df.iterrows(): result = { "unit": row["机组类型"], "cip_time": row["CIP时机"].isoformat() if pd.notna(row["CIP时机"]) else None, "strategy_description": row["策略说明"] } final_results.append(result) self.analysis_data["final_results"] = final_results # 保存结果文件 result_file = self.data_dir / f"{self.session_id}_final_results.csv" result_df.to_csv(result_file, index=False, encoding='utf-8') results_summary = ", ".join([f"{r['unit']}: {r['cip_time'] or 'N/A'}" for r in final_results]) self.logger.info(f"最终结果 - {results_summary}, 已保存: {result_file.name}") except Exception as e: self.logger.error(f"记录最终结果失败: {e}") def create_analysis_plots(self, all_data, unit_days_dict): """ 创建分析图表 功能:生成压差趋势图和机组对比图 Args: all_data: pd.DataFrame,完整预测数据 unit_days_dict: dict,各机组预测天数 """ try: # 创建压差趋势总览图 fig, axes = plt.subplots(2, 2, figsize=(20, 12)) fig.suptitle(f'RO膜污染分析总览 - {self.session_id}', fontsize=16, fontweight='bold') # 为每个机组绘制压差趋势 for idx, unit_id in enumerate([1, 2, 3, 4]): ax = axes[idx//2, idx%2] # 筛选该机组的压差列 unit_columns = [col for col in all_data.columns if f'RO{unit_id}' in col and 'DPT' in col] if unit_columns: # 截取预测天数范围内的数据 predict_days = unit_days_dict.get(unit_id, 90) end_time = all_data.index[0] + timedelta(days=predict_days) truncated_data = all_data.loc[all_data.index <= end_time] # 绘制各段压差曲线 for col in unit_columns: if col in truncated_data.columns: stage = "一段" if "DPT_1" in col else "二段" ax.plot(truncated_data.index, truncated_data[col], label=f'{stage}压差', linewidth=2, alpha=0.8) ax.set_title(f'RO{unit_id} 压差趋势 (预测{predict_days}天)', fontweight='bold') ax.set_xlabel('时间') ax.set_ylabel('压差 (MPa)') ax.legend() ax.grid(True, alpha=0.3) ax.tick_params(axis='x', rotation=45) else: # 无数据时显示提示 ax.text(0.5, 0.5, f'RO{unit_id}\n无压差数据', ha='center', va='center', transform=ax.transAxes, fontsize=14, bbox=dict(boxstyle="round,pad=0.3", facecolor="lightgray")) ax.set_title(f'RO{unit_id} - 无数据') plt.tight_layout() # 保存图表文件 plot_file = self.plots_dir / f"{self.session_id}_pressure_trends.png" plt.savefig(plot_file, dpi=300, bbox_inches='tight') plt.close() # 创建机组对比图 comparison_plot = self._create_unit_comparison_plot(unit_days_dict) self.logger.info(f"分析图表 - 已保存: {plot_file.name}, {comparison_plot}") except Exception as e: self.logger.error(f"创建分析图表失败: {e}") def _create_unit_comparison_plot(self, unit_days_dict): """创建机组对比图""" try: fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6)) # 预测天数对比 units = list(unit_days_dict.keys()) days = list(unit_days_dict.values()) bars = ax1.bar([f'RO{u}' for u in units], days, color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']) ax1.set_title('各机组预测周期对比', fontweight='bold') ax1.set_ylabel('预测天数') ax1.grid(True, alpha=0.3) # 在柱子上显示数值 for bar, day in zip(bars, days): height = bar.get_height() ax1.text(bar.get_x() + bar.get_width()/2., height + 1, f'{day}天', ha='center', va='bottom', fontweight='bold') # CIP时机分布(如果有结果的话) if self.analysis_data.get("final_results"): cip_times = [] units_with_cip = [] for result in self.analysis_data["final_results"]: if result["cip_time"]: cip_times.append(pd.to_datetime(result["cip_time"])) units_with_cip.append(result["unit"]) if cip_times: # 计算从预测开始的天数 start_time = min(cip_times) days_from_start = [(t - start_time).days for t in cip_times] bars2 = ax2.bar(units_with_cip, days_from_start, color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728'][:len(units_with_cip)]) ax2.set_title('各机组CIP建议时机', fontweight='bold') ax2.set_ylabel('距离预测开始天数') ax2.grid(True, alpha=0.3) # 在柱子上显示数值 for bar, day in zip(bars2, days_from_start): height = bar.get_height() ax2.text(bar.get_x() + bar.get_width()/2., height + 0.5, f'{day}天', ha='center', va='bottom', fontweight='bold') else: ax2.text(0.5, 0.5, '无有效CIP时机', ha='center', va='center', transform=ax2.transAxes, fontsize=14, bbox=dict(boxstyle="round,pad=0.3", facecolor="lightcoral")) else: ax2.text(0.5, 0.5, '分析未完成', ha='center', va='center', transform=ax2.transAxes, fontsize=14, bbox=dict(boxstyle="round,pad=0.3", facecolor="lightgray")) plt.tight_layout() # 保存图表文件 comparison_file = self.plots_dir / f"{self.session_id}_unit_comparison.png" plt.savefig(comparison_file, dpi=300, bbox_inches='tight') plt.close() return comparison_file.name except Exception as e: self.logger.error(f"创建机组对比图失败: {e}") return None def generate_analysis_report(self): """ 生成分析报告 功能: 1. 保存完整分析数据(JSON格式) 2. 生成HTML格式报告 3. 输出会话总结 """ try: # 记录结束时间 self.analysis_data["session_info"]["end_time"] = datetime.now().isoformat() # 保存JSON数据 json_file = self.reports_dir / f"{self.session_id}_analysis_data.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump(self.analysis_data, f, ensure_ascii=False, indent=2) # 生成HTML报告 html_report = self._generate_html_report() html_file = self.reports_dir / f"{self.session_id}_report.html" with open(html_file, 'w', encoding='utf-8') as f: f.write(html_report) self.logger.info(f"分析报告 - 已保存: {json_file.name}, {html_file.name}") # 生成会话总结 self._log_session_summary() except Exception as e: self.logger.error(f"生成分析报告失败: {e}") def _generate_html_report(self): """生成HTML格式的分析报告""" session_info = self.analysis_data["session_info"] input_params = self.analysis_data["input_parameters"] final_results = self.analysis_data.get("final_results", []) html = f"""
会话ID: {session_info['session_id']}
开始时间:
结束时间:
系统版本: {session_info['system_version']}
策略: {input_params.get('strategy', 'N/A')}
起始时间: {input_params.get('start_date', 'N/A')}
预测起始: {input_params.get('prediction_start_date', 'N/A')}
| 机组 | CIP建议时机 | 策略说明 | 状态 |
|---|---|---|---|
| {result['unit']} | {cip_time_display} | {result['strategy_description']} | {status_text} |
本次分析生成的所有数据文件和图表已保存在以下目录:
本报告基于RO膜污染监控与CIP预测系统生成,包含了完整的分析过程记录。工艺人员可以根据以下内容进行合理性排查: