Browse Source

1:新增一段二段逻辑
2:优化调用代码逻辑

wmy 1 week ago
parent
commit
3d24dda126
7 changed files with 427 additions and 153 deletions
  1. 27 11
      config.json
  2. 1 1
      fouling_model_0922/data_mysql.py
  3. 14 1
      fouling_model_0922/predict.py
  4. 52 2
      get_api_data.py
  5. 71 66
      logging_system.py
  6. 161 42
      main_simple.py
  7. 101 30
      smart_monitor.py

+ 27 - 11
config.json

@@ -3,7 +3,7 @@
   "api": {
     "base_url": "http://120.55.44.4:8900",
     "callback_endpoint": "/api/dtgateway/v1/decision/data",
-    "API_History_URL" : "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data",
+    "API_History_URL": "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data",
     "jwt_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M"
   },
   "_comment_database": "MySQL数据库连接配置",
@@ -20,24 +20,40 @@
     "project_id": 92,
     "_project_id_desc": "SCADA项目ID"
   },
-  "_comment_cip_times": "各机组CIP时间状态",
+  "_comment_cip_times": "各机组分段CIP时间状态(支持一段、二段独立配置)",
   "cip_times": {
-    "RO1": {
-      "actual_time": "2025-09-11 10:30:00",
+    "RO1-一段": {
+      "actual_time": "2025-10-30 09:30:00",
       "predicted_time": ""
     },
-    "RO2": {
-      "actual_time": "2025-09-30 10:31:00",
+    "RO1-二段": {
+      "actual_time": "2025-10-30 10:30:00",
       "predicted_time": ""
     },
-    "_RO3_disabled": {
-      "actual_time": "2025-09-12 10:05:00",
+    "RO2-一段": {
+      "actual_time": "2025-09-30 10:30:00",
+      "predicted_time": ""
+    },
+    "RO2-二段": {
+      "actual_time": "2025-09-30 11:30:00",
+      "predicted_time": ""
+    },
+    "_RO3-一段_disabled": {
+      "actual_time": "",
+      "predicted_time": null
+    },
+    "_RO3-二段_disabled": {
+      "actual_time": "",
+      "predicted_time": null
+    },
+    "_RO4-一段_disabled": {
+      "actual_time": "",
       "predicted_time": null
     },
-    "_RO4_disabled": {
-      "actual_time": "2025-09-29 10:30:00",
+    "_RO4-二段_disabled": {
+      "actual_time": "",
       "predicted_time": null
     },
-    "_desc": "各机组CIP时间状态:actual_time=实际执行的CIP时间(修改此字段触发预测),predicted_time=模型预测的下次CIP时间(仅用于记录)"
+    "_desc": "各机组分段CIP时间状态:actual_time=实际执行的CIP时间(修改此字段触发预测),predicted_time=模型预测的下次CIP时间(仅用于记录)。支持独立配置每个段的CIP时间。"
   }
 }

+ 1 - 1
fouling_model_0922/data_mysql.py

@@ -97,7 +97,7 @@ def get_sensor_data(start_date=None):
         return pd.DataFrame()
 
     if data_origin.empty:
-        print("⚠️ 数据库未返回任何数据,无法进行后续处理。")
+        print("警告:数据库未返回任何数据,无法进行后续处理。")
         return pd.DataFrame()
 
     data_origin['index'] = pd.to_datetime(data_origin['h_time'])

+ 14 - 1
fouling_model_0922/predict.py

@@ -178,7 +178,16 @@ class Predictor:
                 outputs = self.model(inputs)
                 all_predictions.append(outputs.cpu().numpy())
 
-        predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
+        if not all_predictions:
+            print("警告:模型未产生任何预测!")
+            predictions = np.zeros((self.output_size, self.labels_num))
+        else:
+            predictions = np.concatenate(all_predictions, axis=0).reshape(-1, self.labels_num)
+        
+        # 诊断信息
+        print(f"预测数据形状: {predictions.shape}")
+        print(f"预测数据范围: [{np.nanmin(predictions):.4f}, {np.nanmax(predictions):.4f}]")
+        print(f"NaN数量: {np.isnan(predictions).sum()}")
         
         # 反标准化处理
         inverse_scaler = MinMaxScaler()
@@ -187,6 +196,10 @@ class Predictor:
         predictions = inverse_scaler.inverse_transform(predictions)
         predictions = np.clip(predictions, 0, None)
         
+        # 诊断信息(反标准化后)
+        print(f"反标准化后范围: [{np.nanmin(predictions):.4f}, {np.nanmax(predictions):.4f}]")
+        print(f"反标准化后NaN数量: {np.isnan(predictions).sum()}")
+        
         # 平滑处理
         predictions = self.smooth_predictions(predictions)
         self.test_start_date = (pd.to_datetime(df['index']).max() + timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")

+ 52 - 2
get_api_data.py

@@ -5,8 +5,19 @@ from datetime import datetime, timedelta
 
 # --- 1. 基础配置 ---
 # 配置日志记录器,方便调试和追踪
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
-logger = logging.getLogger()
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.INFO)
+
+# 禁止传播到root logger,避免重复输出
+logger.propagate = False
+
+# 只有在logger没有handler时才添加(防止重复)
+if not logger.handlers:
+    console_handler = logging.StreamHandler()
+    console_handler.setLevel(logging.INFO)
+    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
+    console_handler.setFormatter(formatter)
+    logger.addHandler(console_handler)
 
 
 # --- 2. 辅助函数:填充缺失的小时数据 ---
@@ -145,6 +156,45 @@ def get_sensor_data(end_date_str=None, API_BASE_URL=None, HEADERS=None):
     logger.info("正在填充缺失的小时数据以确保时间序列的完整性...")
     # pivot_df.columns.name = None
     final_df = fill_missing_hourly_data(pivot_df, start_date, end_date)
+    
+    # 步骤 7: 对 final_df 进行插值处理,消除所有 NaN 值
+    if not final_df.empty:
+        # 检查是否存在 NaN 值
+        nan_count_before = final_df.isna().sum().sum()
+        if nan_count_before > 0:
+            logger.info(f"检测到 {nan_count_before} 个 NaN 值,开始进行插值处理...")
+            
+            # 保存 'index' 列(时间列)
+            time_column = final_df['index'].copy() if 'index' in final_df.columns else None
+            
+            # 对数值列进行插值处理
+            numeric_columns = final_df.select_dtypes(include=['float64', 'int64']).columns
+            
+            if len(numeric_columns) > 0:
+                # 1. 线性插值(适合时间序列数据)
+                final_df[numeric_columns] = final_df[numeric_columns].interpolate(method='linear', limit_direction='both')
+                
+                # 2. 前向填充(处理开头的 NaN)
+                final_df[numeric_columns] = final_df[numeric_columns].ffill()
+                
+                # 3. 后向填充(处理末尾的 NaN)
+                final_df[numeric_columns] = final_df[numeric_columns].bfill()
+                
+                # 4. 如果仍有 NaN(整列为空的情况),用0填充
+                final_df[numeric_columns] = final_df[numeric_columns].fillna(0)
+            
+            # 恢复 'index' 列(确保时间列不被修改)
+            if time_column is not None:
+                final_df['index'] = time_column
+            
+            nan_count_after = final_df.isna().sum().sum()
+            logger.info(f"插值处理完成,剩余 NaN 值: {nan_count_after} 个")
+            
+            if nan_count_after > 0:
+                logger.warning(f"警告: 仍有 {nan_count_after} 个 NaN 值未能填充")
+        else:
+            logger.info("数据中没有 NaN 值,无需插值处理")
+    
     # final_df = final_df.sort_values('index').reset_index(drop=True)
     return final_df
 

+ 71 - 66
logging_system.py

@@ -33,17 +33,39 @@ class CIPAnalysisLogger:
     - 生成分析图表和报告
     """
     
-    def __init__(self, log_dir="analysis_logs"):
+    def __init__(self, log_dir="analysis_logs", unit_filter=None):
         """
         初始化日志记录器
         
         Args:
             log_dir: str,日志目录路径,默认"analysis_logs"
+            unit_filter: str,机组过滤器,如'RO1',用于目录命名
+        
+        目录结构(优化后):
+            analysis_logs/
+                CIP_RO1_20251105_155542/  # 类别_机组_时间
+                    CIP_Analysis_20251105_155542.log
+                    data/
+                    plots/
+                    reports/
+                CIP_ALL_20251105_160000/  # 全机组分析
         """
-        self.log_dir = Path(log_dir)
+        # 生成会话ID和时间戳
+        self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        self.session_id = f"CIP_Analysis_{self.timestamp}"
+        
+        # 创建按类别和时间分组的会话目录
+        base_log_dir = Path(log_dir)
+        base_log_dir.mkdir(exist_ok=True)
+        
+        # 目录命名:CIP_机组_时间
+        unit_name = unit_filter if unit_filter else "ALL"
+        dir_name = f"CIP_{unit_name}_{self.timestamp}"
+        
+        self.log_dir = base_log_dir / dir_name
         self.log_dir.mkdir(exist_ok=True)
         
-        # 创建子目录结构
+        # 创建子目录结构(放在会话目录下)
         self.data_dir = self.log_dir / "data"
         self.plots_dir = self.log_dir / "plots" 
         self.reports_dir = self.log_dir / "reports"
@@ -51,10 +73,6 @@ class CIPAnalysisLogger:
         for dir_path in [self.data_dir, self.plots_dir, self.reports_dir]:
             dir_path.mkdir(exist_ok=True)
         
-        # 生成会话ID
-        self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
-        self.session_id = f"CIP_Analysis_{self.timestamp}"
-        
         # 初始化日志
         self.setup_logging()
         
@@ -73,7 +91,7 @@ class CIPAnalysisLogger:
             "performance_metrics": {}
         }
         
-        self.logger.info(f"开始CIP分析会话: {self.session_id}")
+        self.logger.info(f"CIP分析会话: {self.session_id}, 目录: {self.log_dir}")
     
     def setup_logging(self):
         """设置日志配置"""
@@ -83,6 +101,15 @@ class CIPAnalysisLogger:
         self.logger = logging.getLogger(self.session_id)
         self.logger.setLevel(logging.INFO)
         
+        # 禁止日志传播到父logger,避免重复输出
+        self.logger.propagate = False
+        
+        # 清除已有的所有处理器(防止重复添加)
+        if self.logger.handlers:
+            for handler in self.logger.handlers[:]:
+                handler.close()
+                self.logger.removeHandler(handler)
+        
         # 创建文件处理器
         file_handler = logging.FileHandler(log_file, encoding='utf-8')
         file_handler.setLevel(logging.INFO)
@@ -101,9 +128,8 @@ class CIPAnalysisLogger:
         console_handler.setFormatter(formatter)
         
         # 添加处理器
-        if not self.logger.handlers:
-            self.logger.addHandler(file_handler)
-            self.logger.addHandler(console_handler)
+        self.logger.addHandler(file_handler)
+        self.logger.addHandler(console_handler)
     
     def log_input_parameters(self, strategy, start_date, prediction_start_date=None):
         """
@@ -123,11 +149,8 @@ class CIPAnalysisLogger:
         
         self.analysis_data["input_parameters"] = params
         
-        self.logger.info("输入参数:")
-        self.logger.info(f"  策略: {strategy}")
-        self.logger.info(f"  起始时间: {start_date}")
-        if prediction_start_date:
-            self.logger.info(f"  预测起始时间: {prediction_start_date}")
+        pred_start = f", 预测起始: {prediction_start_date}" if prediction_start_date else ""
+        self.logger.info(f"参数 - 策略: {strategy}, 起始: {start_date}{pred_start}")
     
     def log_prediction_data(self, all_data):
         """
@@ -150,15 +173,10 @@ class CIPAnalysisLogger:
             
             self.analysis_data["prediction_data"] = data_info
             
-            self.logger.info("预测数据概况:")
-            self.logger.info(f"  数据形状: {data_info['shape']}")
-            self.logger.info(f"  时间范围: {data_info['time_range']['start']} 到 {data_info['time_range']['end']}")
-            self.logger.info(f"  总数据点: {data_info['data_points']}")
-            
             # 保存数据到文件
             data_file = self.data_dir / f"{self.session_id}_prediction_data.csv"
             all_data.to_csv(data_file)
-            self.logger.info(f"  数据已保存: {data_file}")
+            self.logger.info(f"预测数据 - 形状: {data_info['shape']}, 点数: {data_info['data_points']}, 已保存: {data_file.name}")
             
         except Exception as e:
             self.logger.error(f"记录预测数据失败: {e}")
@@ -172,9 +190,8 @@ class CIPAnalysisLogger:
         """
         self.analysis_data["unit_days"] = unit_days_dict
         
-        self.logger.info("各机组预测天数:")
-        for unit_id, days in unit_days_dict.items():
-            self.logger.info(f"  RO{unit_id}: {days}天")
+        days_str = ", ".join([f"RO{uid}:{days}天" for uid, days in unit_days_dict.items()])
+        self.logger.info(f"预测天数 - {days_str}")
     
     def log_unit_analysis_start(self, unit_id, predict_days):
         """
@@ -184,8 +201,7 @@ class CIPAnalysisLogger:
             unit_id: int,机组ID
             predict_days: int,预测天数
         """
-        self.logger.info(f"\n开始分析机组 RO{unit_id}")
-        self.logger.info(f"  预测周期: {predict_days}天")
+        self.logger.info(f"[RO{unit_id}] 开始分析, 预测周期: {predict_days}天")
         
         if unit_id not in self.analysis_data["unit_analysis"]:
             self.analysis_data["unit_analysis"][unit_id] = {}
@@ -214,9 +230,8 @@ class CIPAnalysisLogger:
             
             self.analysis_data["unit_analysis"][unit_id]["pressure_data"] = unit_data
             
-            self.logger.info(f"  找到RO{unit_id}压差列: {len(pressure_columns)}个")
-            for col in pressure_columns:
-                self.logger.info(f"    {col}")
+            cols_str = ", ".join(pressure_columns)
+            self.logger.info(f"[RO{unit_id}] 压差列: {len(pressure_columns)}个 ({cols_str})")
             
             # 保存数据文件
             unit_data_file = self.data_dir / f"{self.session_id}_RO{unit_id}_pressure_data.csv"
@@ -227,13 +242,13 @@ class CIPAnalysisLogger:
     
     def log_cip_analysis_result(self, unit_id, column, optimal_time, analysis):
         """
-        记录CIP分析结果
+        记录CIP分析结果(增强版,包含诊断信息)
         
         Args:
             unit_id: int,机组ID
             column: str,压差列名
             optimal_time: pd.Timestamp,最优CIP时间
-            analysis: dict,分析结果
+            analysis: dict,分析结果(可能包含诊断信息)
         """
         try:
             if "cip_results" not in self.analysis_data["unit_analysis"][unit_id]:
@@ -252,7 +267,18 @@ class CIPAnalysisLogger:
             if optimal_time:
                 self.logger.info(f"  {column}: {optimal_time} (第{analysis['delay_days']}天, k={analysis['best_k']:.6f})")
             else:
-                self.logger.info(f"  {column}: 未找到CIP时机 - {analysis.get('error', '未知原因')}")
+                error_msg = analysis.get('error', '未知原因')
+                self.logger.info(f"  {column}: 未找到CIP时机 - {error_msg}")
+                
+                # 记录详细诊断信息
+                if 'hint' in analysis:
+                    self.logger.info(f"    提示: {analysis['hint']}")
+                if 'valid_k_count' in analysis:
+                    self.logger.info(f"    有效k值数量: {analysis['valid_k_count']}")
+                if 'rising_periods_count' in analysis:
+                    self.logger.info(f"    上升趋势段数: {analysis['rising_periods_count']}")
+                if 'data_days' in analysis:
+                    self.logger.info(f"    数据覆盖天数: {analysis['data_days']}天")
                 
         except Exception as e:
             self.logger.error(f"记录CIP分析结果失败: {e}")
@@ -275,8 +301,7 @@ class CIPAnalysisLogger:
             
             self.analysis_data["unit_analysis"][unit_id]["strategy_result"] = strategy_result
             
-            self.logger.info(f"RO{unit_id}最优CIP时机: {optimal_time}")
-            self.logger.info(f"  策略: {strategy_desc}")
+            self.logger.info(f"[RO{unit_id}] 最优CIP时机: {optimal_time}, 策略: {strategy_desc}")
             
         except Exception as e:
             self.logger.error(f"记录RO{unit_id}策略结果失败: {e}")
@@ -301,15 +326,12 @@ class CIPAnalysisLogger:
             
             self.analysis_data["final_results"] = final_results
             
-            self.logger.info("\n最终分析结果:")
-            for result in final_results:
-                self.logger.info(f"  {result['unit']}: {result['cip_time'] or 'N/A'}")
-                self.logger.info(f"    策略: {result['strategy_description']}")
-            
             # 保存结果文件
             result_file = self.data_dir / f"{self.session_id}_final_results.csv"
             result_df.to_csv(result_file, index=False, encoding='utf-8')
-            self.logger.info(f"  结果已保存: {result_file}")
+            
+            results_summary = ", ".join([f"{r['unit']}: {r['cip_time'] or 'N/A'}" for r in final_results])
+            self.logger.info(f"最终结果 - {results_summary}, 已保存: {result_file.name}")
             
         except Exception as e:
             self.logger.error(f"记录最终结果失败: {e}")
@@ -325,8 +347,6 @@ class CIPAnalysisLogger:
             unit_days_dict: dict,各机组预测天数
         """
         try:
-            self.logger.info("\n生成分析图表...")
-            
             # 创建压差趋势总览图
             fig, axes = plt.subplots(2, 2, figsize=(20, 12))
             fig.suptitle(f'RO膜污染分析总览 - {self.session_id}', fontsize=16, fontweight='bold')
@@ -371,10 +391,10 @@ class CIPAnalysisLogger:
             plt.savefig(plot_file, dpi=300, bbox_inches='tight')
             plt.close()
             
-            self.logger.info(f"  压差趋势图已保存: {plot_file}")
-            
             # 创建机组对比图
-            self._create_unit_comparison_plot(unit_days_dict)
+            comparison_plot = self._create_unit_comparison_plot(unit_days_dict)
+            
+            self.logger.info(f"分析图表 - 已保存: {plot_file.name}, {comparison_plot}")
             
         except Exception as e:
             self.logger.error(f"创建分析图表失败: {e}")
@@ -442,10 +462,11 @@ class CIPAnalysisLogger:
             plt.savefig(comparison_file, dpi=300, bbox_inches='tight')
             plt.close()
             
-            self.logger.info(f"  机组对比图已保存: {comparison_file}")
+            return comparison_file.name
             
         except Exception as e:
             self.logger.error(f"创建机组对比图失败: {e}")
+            return None
     
     def generate_analysis_report(self):
         """
@@ -457,8 +478,6 @@ class CIPAnalysisLogger:
         3. 输出会话总结
         """
         try:
-            self.logger.info("\n生成分析报告...")
-            
             # 记录结束时间
             self.analysis_data["session_info"]["end_time"] = datetime.now().isoformat()
             
@@ -473,8 +492,7 @@ class CIPAnalysisLogger:
             with open(html_file, 'w', encoding='utf-8') as f:
                 f.write(html_report)
             
-            self.logger.info(f"  JSON数据已保存: {json_file}")
-            self.logger.info(f"  HTML报告已保存: {html_file}")
+            self.logger.info(f"分析报告 - 已保存: {json_file.name}, {html_file.name}")
             
             # 生成会话总结
             self._log_session_summary()
@@ -606,30 +624,17 @@ class CIPAnalysisLogger:
         end_time = datetime.fromisoformat(session_info["end_time"])
         duration = end_time - start_time
         
-        self.logger.info(f"\n分析会话总结:")
-        self.logger.info(f"  会话ID: {session_info['session_id']}")
-        self.logger.info(f"  总耗时: {duration.total_seconds():.2f}秒")
-        
         # 统计分析结果
         final_results = self.analysis_data.get("final_results", [])
         success_count = sum(1 for r in final_results if r["cip_time"])
         total_count = len(final_results)
-        
-        self.logger.info(f"  分析机组: {total_count}个")
-        self.logger.info(f"  成功预测: {success_count}个")
-        if total_count > 0:
-            self.logger.info(f"  成功率: {success_count/total_count*100:.1f}%")
-        else:
-            self.logger.info(f"  成功率: N/A")
+        success_rate = f"{success_count/total_count*100:.1f}%" if total_count > 0 else "N/A"
         
         # 统计生成的文件
         data_files = len(list(self.data_dir.glob(f"{self.session_id}_*.csv")))
         plot_files = len(list(self.plots_dir.glob(f"{self.session_id}_*.png")))
         
-        self.logger.info(f"  生成数据文件: {data_files}个")
-        self.logger.info(f"  生成图表文件: {plot_files}个")
-        
-        self.logger.info(f"分析会话结束: {self.session_id}")
+        self.logger.info(f"会话结束 - ID: {session_info['session_id']}, 耗时: {duration.total_seconds():.2f}秒, 机组: {total_count}个, 成功: {success_count}个({success_rate}), 文件: {data_files}数据+{plot_files}图表")
     
     def close(self):
         """关闭日志记录器"""

+ 161 - 42
main_simple.py

@@ -4,15 +4,23 @@ RO膜污染监控与CIP预测 - 基于预测数据的最优时机分析
 
 核心功能:分析RO膜压差预测数据,计算最优CIP清洗时机
 
-CIP时机选择策略:
-1. 最早时机策略:一段或二段任一需要CIP时即触发
-2. 最晚时机策略:等待所有段都需要CIP时触发
-3. 加权平均策略:综合两段污染程度,污染严重段权重更大
-4. 污染严重程度策略:基于k值最大的段决策
+CIP输出模式:
+1. 分段输出模式(separate_stages=True,默认):
+   - 分别输出每个段的独立CIP时机
+   - 例如:RO1-一段: 2025-10-15 10:00:00, RO1-二段: 2025-10-20 15:00:00
+   
+2. 合并输出模式(separate_stages=False,保留以备后用):
+   - 使用策略合并输出一个CIP时机
+   - 策略1: 最早时机策略 - 一段或二段任一需要CIP时即触发
+   - 策略2: 最晚时机策略 - 等待所有段都需要CIP时触发
+   - 策略3: 加权平均策略 - 综合两段污染程度,污染严重段权重更大
+   - 策略4: 污染严重程度策略 - 基于k值最大的段决策
 
 使用方法:
-    main(strategy=3)  # 使用策略3
-    main(strategy=1, start_date='2025-08-26 00:00:00')  # 指定策略和时间
+    main()  # 使用默认参数(分段输出模式)
+    main(separate_stages=False, strategy=3)  # 使用合并模式+策略3
+    main(start_date='2025-08-26 00:00:00')  # 指定起始时间
+    main(unit_filter='RO1')  # 只分析RO1
 """
 
 import pandas as pd
@@ -290,13 +298,22 @@ class OptimalCIPPredictor:
         
         # 检查:k值数量是否足够
         if valid_k_count < 10:
-            return None, {"error": "有效k值数量不足"}
+            return None, {
+                "error": "有效k值数量不足",
+                "valid_k_count": valid_k_count,
+                "required": 10
+            }
         
         # 步骤2:识别连续上升时间段
         rising_periods = self.find_continuous_rising_periods(k_values)
         
         if not rising_periods:
-            return None, {"error": "未发现连续上升趋势"}
+            return None, {
+                "error": "未发现连续上升趋势",
+                "valid_k_count": valid_k_count,
+                "min_continuous_rising": self.min_continuous_rising,
+                "hint": "k值没有持续上升趋势,可能膜污染较稳定"
+            }
         
         # 步骤3:应用时间约束,筛选有效时间段
         min_delay_time = pressure_series.index[0] + timedelta(days=self.min_delay_days)
@@ -318,7 +335,13 @@ class OptimalCIPPredictor:
                     valid_periods.append((start_idx, end_idx, duration))
         
         if not valid_periods:
-            return None, {"error": f"无满足时间约束的上升趋势(需>={self.min_delay_days}天后)"}
+            return None, {
+                "error": f"无满足时间约束的上升趋势(需>={self.min_delay_days}天后)",
+                "rising_periods_count": len(rising_periods),
+                "min_delay_days": self.min_delay_days,
+                "data_days": (pressure_series.index[-1] - pressure_series.index[0]).days,
+                "hint": f"发现{len(rising_periods)}个上升趋势,但都在前{self.min_delay_days}天内"
+            }
         
         # 步骤4:在有效时间段内寻找k值最大的点
         best_time = None
@@ -337,7 +360,8 @@ class OptimalCIPPredictor:
         analysis_result = {
             "success": True,
             "delay_days": (best_time - pressure_series.index[0]).days,
-            "best_k": float(best_k)
+            "best_k": float(best_k),
+            "valid_periods_count": len(valid_periods)
         }
         
         return best_time, analysis_result
@@ -405,6 +429,23 @@ def select_optimal_cip_strategy_4(cip_results):
     max_k_result = max(cip_results, key=lambda x: x['k_value'])
     return max_k_result['time'], f"污染严重程度策略 - {max_k_result['column']} (k值={max_k_result['k_value']:.6f}, 第{max_k_result['delay_days']}天)"
 
+def extract_stage_name(column_name):
+    """
+    从列名中提取段号信息
+    
+    Args:
+        column_name: str,列名,例如'C.M.RO1_DB@DPT_1_pred'或'C.M.RO2_DB@DPT_2_pred'
+    
+    Returns:
+        str: 段号名称,例如'一段'或'二段'
+    """
+    if 'DPT_1' in column_name:
+        return '一段'
+    elif 'DPT_2' in column_name:
+        return '二段'
+    else:
+        return '未知段'
+
 def select_optimal_cip_time(cip_results, strategy=1):
     """
     根据指定策略选择最优CIP时机
@@ -433,7 +474,7 @@ def select_optimal_cip_time(cip_results, strategy=1):
     
     return strategy_map[strategy](cip_results)
 
-def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
+def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None, separate_stages=True):
     """
     分析RO机组的最优CIP时间
     
@@ -450,12 +491,15 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
             4: 污染严重程度策略
         start_date: str,预测起始时间,格式'YYYY-MM-DD HH:MM:SS',默认使用当前时间
         unit_filter: str,指定分析的机组,如'RO1',默认分析所有机组
+        separate_stages: bool,是否分段输出一段和二段的独立CIP时机
+            True: 分别输出每个段的CIP时机(例如:RO1-一段、RO1-二段)
+            False: 使用策略合并输出一个CIP时机(例如:RO1)
     
     Returns:
         pd.DataFrame: 包含机组类型、CIP时机、策略说明的结果表
     """
-    # 初始化日志记录器
-    logger = CIPAnalysisLogger()
+    # 初始化日志记录器(传入unit_filter用于目录命名)
+    logger = CIPAnalysisLogger(unit_filter=unit_filter)
     
     try:
         # 获取预测数据
@@ -535,14 +579,25 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
                 pressure_series.name = column
                 
                 # 数据点数检查:至少需要30天数据
+                data_days = len(pressure_series) / 24
+                print(f"  {column}: 数据点数={len(pressure_series)}, 约{data_days:.1f}天")
+                
                 if len(pressure_series) < 30 * 24:
+                    print(f"    [跳过] 数据不足30天")
+                    logger.log_cip_analysis_result(unit_id, column, None, 
+                        {"error": f"数据不足: {len(pressure_series)}点 < 720点(30天)"})
                     continue
                 
                 try:
                     # 寻找最优CIP时机
                     optimal_time, analysis = predictor.find_optimal_cip_time(pressure_series)
                     
-                    # 记录分析结果
+                    # 记录分析结果(带详细诊断信息)
+                    if optimal_time:
+                        print(f"    [成功] 找到CIP时机: {optimal_time.strftime('%Y-%m-%d %H:%M')}")
+                    else:
+                        print(f"    [失败] 未找到CIP时机: {analysis.get('error', '未知原因')}")
+                    
                     logger.log_cip_analysis_result(unit_id, column, optimal_time, analysis)
                     
                     if optimal_time:
@@ -554,29 +609,62 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
                         })
                         
                 except Exception as e:
+                    print(f"    [异常] 分析失败: {str(e)}")
                     logger.log_cip_analysis_result(unit_id, column, None, {"error": str(e)})
             
-            # 根据策略选择最优CIP时机
+            # 根据separate_stages参数决定输出方式
             if cip_results:
-                optimal_time, strategy_desc = select_optimal_cip_time(cip_results, strategy)
-                results.append({
-                    '机组类型': f"RO{unit_id}",
-                    'CIP时机': optimal_time,
-                    '策略说明': strategy_desc
-                })
-                print(f"RO{unit_id} CIP时机: {optimal_time.strftime('%Y-%m-%d %H:%M:%S')}")
-                
-                logger.log_unit_strategy_result(unit_id, optimal_time, strategy_desc)
+                if separate_stages:
+                    # 方案1:分段输出每个段的独立CIP时机
+                    print(f"\n[RO{unit_id}] 分段输出模式:")
+                    for cip_result in cip_results:
+                        stage_name = extract_stage_name(cip_result['column'])
+                        unit_stage_name = f"RO{unit_id}-{stage_name}"
+                        stage_desc = f"独立分析 - {cip_result['column']} (k值={cip_result['k_value']:.6f}, 第{cip_result['delay_days']}天)"
+                        
+                        results.append({
+                            '机组类型': unit_stage_name,
+                            'CIP时机': cip_result['time'],
+                            '策略说明': stage_desc
+                        })
+                        print(f"  {unit_stage_name}: {cip_result['time'].strftime('%Y-%m-%d %H:%M:%S')}")
+                        
+                        # 记录日志
+                        logger.log_unit_strategy_result(unit_id, cip_result['time'], f"{stage_name} - {stage_desc}")
+                else:
+                    # 原有逻辑:使用策略合并输出一个CIP时机(保留以备后用)
+                    optimal_time, strategy_desc = select_optimal_cip_time(cip_results, strategy)
+                    results.append({
+                        '机组类型': f"RO{unit_id}",
+                        'CIP时机': optimal_time,
+                        '策略说明': strategy_desc
+                    })
+                    print(f"RO{unit_id} CIP时机: {optimal_time.strftime('%Y-%m-%d %H:%M:%S')}")
+                    
+                    logger.log_unit_strategy_result(unit_id, optimal_time, strategy_desc)
             else:
                 # 如果没找到最优CIP时机,使用预测天数的最后时间作为CIP时机
                 fallback_time = end_time
                 fallback_desc = f"使用预测终点时间 (第{predict_days}天)"
-                results.append({
-                    '机组类型': f"RO{unit_id}",
-                    'CIP时机': fallback_time,
-                    '策略说明': fallback_desc
-                })
-                print(f"RO{unit_id} CIP时机: {fallback_time.strftime('%Y-%m-%d %H:%M:%S')} (备用策略)")
+                
+                if separate_stages:
+                    # 分段模式:为每个段都输出备用时机
+                    print(f"\n[RO{unit_id}] 无有效CIP时机,使用备用策略")
+                    for stage_num in ['一段', '二段']:
+                        results.append({
+                            '机组类型': f"RO{unit_id}-{stage_num}",
+                            'CIP时机': fallback_time,
+                            '策略说明': fallback_desc
+                        })
+                        print(f"  RO{unit_id}-{stage_num}: {fallback_time.strftime('%Y-%m-%d %H:%M:%S')} (备用策略)")
+                else:
+                    # 合并模式:输出一个备用时机
+                    results.append({
+                        '机组类型': f"RO{unit_id}",
+                        'CIP时机': fallback_time,
+                        '策略说明': fallback_desc
+                    })
+                    print(f"RO{unit_id} CIP时机: {fallback_time.strftime('%Y-%m-%d %H:%M:%S')} (备用策略)")
                 
                 logger.log_unit_strategy_result(unit_id, fallback_time, fallback_desc)
     
@@ -604,7 +692,7 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
         # 确保日志记录器正确关闭
         logger.close()
 
-def main(strategy=3, start_date=None, unit_filter=None):
+def main(strategy=3, start_date=None, unit_filter=None, separate_stages=True, send_callback=True):
     """
     主执行函数
     
@@ -618,20 +706,31 @@ def main(strategy=3, start_date=None, unit_filter=None):
             4: 污染严重程度策略
         start_date: str,预测起始时间,格式'YYYY-MM-DD HH:MM:SS',默认None(使用当前时间)
         unit_filter: str,指定预测的机组,如'RO1',默认None(预测所有机组)
+        separate_stages: bool,是否分段输出一段和二段的独立CIP时机,默认True
+            True: 分别输出每个段的CIP时机(例如:RO1-一段、RO1-二段)
+            False: 使用策略合并输出一个CIP时机(例如:RO1)
+        send_callback: bool,是否发送回调,默认True
+            当从 smart_monitor 调用时应设为 False,避免重复发送
     
     Returns:
         pd.DataFrame: 分析结果
     
     示例:
-        result_df = main()  # 使用默认参数
+        result_df = main()  # 使用默认参数(分段输出)
+        result_df = main(separate_stages=False)  # 使用策略合并
         result_df = main(start_date='2025-07-01 00:00:00')  # 指定时间
         result_df = main(strategy=1, unit_filter='RO1')  # 指定策略和机组
     """
     # 执行分析 
-    result_df = analyze_ro_unit_cip_timing(strategy=strategy, start_date=start_date, unit_filter=unit_filter)
-    
-    # 发送回调
-    if config and not result_df.empty:
+    result_df = analyze_ro_unit_cip_timing(
+        strategy=strategy, 
+        start_date=start_date, 
+        unit_filter=unit_filter,
+        separate_stages=separate_stages
+    )
+    
+    # 发送回调(如果启用)
+    if send_callback and config and not result_df.empty:
         callback_success = send_decision_to_callback(result_df)
         if not callback_success:
             print(" 回调发送失败")
@@ -663,10 +762,26 @@ def send_decision_to_callback(decision_data):
         if isinstance(decision_data, pd.DataFrame):
             for _, row in decision_data.iterrows():
                 if pd.notna(row["CIP时机"]):
+                    # 从机组类型中提取段号信息
+                    unit_type = row["机组类型"]
+                    
+                    # 判断是一段还是二段
+                    if "一段" in unit_type:
+                        stage_num = 1
+                    elif "二段" in unit_type:
+                        stage_num = 2
+                    else:
+                        stage_num = 1  # 默认为1段(兼容旧格式)
+                    
+                    # 提取纯粹的机组编号(去掉"-一段"或"-二段")
+                    # 例如:"RO1-一段" -> "RO1","RO2-二段" -> "RO2"
+                    unit_name = unit_type.split('-')[0] if '-' in unit_type else unit_type
+                    
                     callback_list.append({
-                        "type": row["机组类型"],
+                        "type": unit_name,
                         "project_id": project_id,
-                        "ctime": row["CIP时机"].strftime("%Y-%m-%d %H:%M:%S")
+                        "ctime": row["CIP时机"].strftime("%Y-%m-%d %H:%M:%S"),
+                        "ceb_backwash_frequency": stage_num
                     })
         else:
             callback_list = [decision_data]
@@ -679,7 +794,6 @@ def send_decision_to_callback(decision_data):
         payload = {
             "list": callback_list
         }
-        
         # 发送HTTP请求(带重试机制)
         max_retries = 3
         retry_interval = 10
@@ -705,5 +819,10 @@ def send_decision_to_callback(decision_data):
 
 if __name__ == '__main__':
     # 示例调用
-    # main()  # 使用当前时间
-    main(start_date='2025-08-26 00:00:00', unit_filter='RO1')  # 使用历史时间
+    
+    # 方式1:分段输出模式(默认,推荐)
+    # 分别输出RO1-一段、RO1-二段的CIP时机
+    main(start_date='2025-10-26 00:00:00', unit_filter='RO1')
+    
+    # 方式2:合并输出模式(使用策略合并,保留以备后用)
+    # main(start_date='2025-08-26 00:00:00', unit_filter='RO1', separate_stages=False, strategy=3)

+ 101 - 30
smart_monitor.py

@@ -1,11 +1,11 @@
 # -*- coding: utf-8 -*-
 """
-CIP监控系统 - 配置文件监控模式
+CIP监控系统 - 配置文件监控模式(支持分段监控)
 
 功能:
-1. 监控config.json中actual_time字段变化
+1. 监控config.json中各机组段(一段、二段)的actual_time字段变化
 2. 检测到变化且actual_time < 当前时间时,基于actual_time执行CIP预测
-3. 将预测结果保存到predicted_time(仅用于记录)
+3. 将预测结果保存到对应段的predicted_time(仅用于记录)
 4. 通过回调接口发送预测结果到决策系统
 5. 继续等待actual_time的下次更新
 
@@ -15,6 +15,12 @@ CIP监控系统 - 配置文件监控模式
 
 工作流程:
 修改actual_time(且 < 当前时间)→ 检测变化 → 执行预测 → 保存predicted_time → 发送回调 → 继续监控actual_time
+
+分段监控说明:
+- 支持独立监控每个段(RO1-一段、RO1-二段等)
+- 每个段有独立的actual_time和predicted_time
+- 可以为不同段设置不同的CIP时间
+- 预测时会分别输出每个段的CIP时机
 """
 
 import os
@@ -32,12 +38,21 @@ from main_simple import send_decision_to_callback
 logger = logging.getLogger(__name__)
 logger.setLevel(logging.INFO)
 
+# 禁止传播到root logger,避免重复输出
+logger.propagate = False
+
 # 日志输出格式
 formatter = logging.Formatter(
     '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
     datefmt='%Y-%m-%d %H:%M:%S'
 )
 
+# 清除已有的处理器(防止重复添加)
+if logger.handlers:
+    for handler in logger.handlers[:]:
+        handler.close()
+        logger.removeHandler(handler)
+
 # 文件日志处理器,单个文件最大5MB,保留3个备份
 file_handler = RotatingFileHandler('smart_monitor.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
 file_handler.setFormatter(formatter)
@@ -52,10 +67,10 @@ logger.addHandler(console_handler)
 
 class SmartCIPMonitor:
     """
-    CIP监控器
+    CIP监控器(支持分段监控)
     
     核心功能:
-    1. 监控config.json中各机组的actual_time字段
+    1. 监控config.json中各机组的actual_time字段
     2. 检测到时间变化后,自动触发CIP预测
     3. 保存predicted_time到配置文件(仅用于记录)
     4. 发送预测结果到回调接口
@@ -65,8 +80,8 @@ class SmartCIPMonitor:
         config: 配置文件内容
         running: 监控运行状态
         monitor_thread: 监控线程
-        unit_names: 机组名称列表
-        last_seen_actual_times: 记录上次看到的各机组actual_time,用于检测变化
+        unit_names: 机组名称列表(如['RO1-一段', 'RO1-二段', ...])
+        last_seen_actual_times: 记录上次看到的各机组actual_time,用于检测变化
     """
     
     def __init__(self):
@@ -82,10 +97,16 @@ class SmartCIPMonitor:
         self.config = None
         self.running = False
         self.monitor_thread = None
-        self.unit_names = ['RO1', 'RO2', 'RO3', 'RO4']
+        # 支持分段监控:每个机组包含一段和二段
+        self.unit_names = [
+            'RO1-一段', 'RO1-二段',
+            'RO2-一段', 'RO2-二段',
+            'RO3-一段', 'RO3-二段',
+            'RO4-一段', 'RO4-二段'
+        ]
         
         # 记录上次看到的actual_time,用于检测变化
-        # 格式:{'RO1': '2025-12-01 12:00:00', 'RO2': '...'}
+        # 格式:{'RO1-一段': '2025-12-01 12:00:00', 'RO1-二段': '...', ...}
         self.last_seen_actual_times = {}
         
         if not self.load_config():
@@ -125,14 +146,15 @@ class SmartCIPMonitor:
     
     def initialize_last_seen_times(self):
         """
-        初始化监控基准时间
+        初始化监控基准时间(支持分段)
         
         功能:
-        读取config.json中各机组当前的actual_time作为监控基准
+        读取config.json中各机组当前的actual_time作为监控基准
         但是:只记录未来的时间,已过期的时间不记录,以便首次检查时能触发预测
         
         说明:
         - last_seen_actual_times用于防止重复处理相同的时间
+        - 支持分段配置(如'RO1-一段', 'RO1-二段')
         - 支持新旧配置格式兼容
         - 已过期的时间(< 当前时间)不记录,确保首次检查时能被检测为"变化"
         """
@@ -186,20 +208,21 @@ class SmartCIPMonitor:
     
     def check_for_changes(self):
         """
-        检查actual_time是否有变化且小于当前时间
+        检查actual_time是否有变化且小于当前时间(支持分段)
         
         功能:
         比对当前配置文件中的actual_time和上次记录的值
         发现不同且小于当前时间则判定为有变化,需要触发预测
         
         Returns:
-            list: 有变化的机组列表,格式 [(unit_name, new_actual_time), ...]
-                  例如:[('RO1', '2025-10-12 12:00:00'), ('RO3', '2025-10-15 08:00:00')]
+            list: 有变化的机组列表,格式 [(unit_name, new_actual_time), ...]
+                  例如:[('RO1-一段', '2025-10-12 12:00:00'), ('RO1-二段', '2025-10-15 08:00:00')]
         
         说明:
         - 只有actual_time变化才会触发
         - 必须满足 actual_time < 当前时间
         - predicted_time的变化不会触发
+        - 支持分段独立监控(每个段可以有不同的actual_time)
         """
         changes = []
         current_time = datetime.now()
@@ -242,41 +265,82 @@ class SmartCIPMonitor:
         
         return changes
     
+    def extract_unit_base_name(self, unit_name):
+        """
+        从分段机组名称中提取基础机组号
+        
+        Args:
+            unit_name: str,分段机组名称,如'RO1-一段'
+        
+        Returns:
+            str: 基础机组号,如'RO1'
+        
+        示例:
+            'RO1-一段' -> 'RO1'
+            'RO2-二段' -> 'RO2'
+        """
+        if '-' in unit_name:
+            return unit_name.split('-')[0]
+        return unit_name
+    
     def predict_and_update_unit(self, unit_name, start_date_str):
         """
-        预测单个机组并保存结果到配置
+        预测并只处理触发段的结果(严格控制:谁触发只处理谁)
         
         功能:
-        1. 调用main_simple.py中的预测函数
-        2. 提取预测结果中的CIP时机
-        3. 保存predicted_time到配置文件(仅用于记录,不参与任何计算)
-        4. 发送预测结果到回调接口
+        1. 调用预测函数(会预测整个机组的所有段,这是模型限制)
+        2. 只提取触发段的结果(其他段的结果丢弃)
+        3. 只保存触发段的predicted_time到配置文件
+        4. 只发送触发段的回调
+        5. 其他未触发的段等它们自己触发后再处理
         
         Args:
-            unit_name: str,机组名称,如'RO1', 'RO2', 'RO3', 'RO4'
+            unit_name: str,触发的机组段名称,如'RO1-一段', 'RO2-二段'
             start_date_str: str,预测起始时间(即actual_time),格式'YYYY-MM-DD HH:MM:SS'
             
         Returns:
             bool: 预测成功返回True,失败返回False
             
         说明:
+        - 严格的分段控制:只处理触发段,其他段不保存、不发送
         - predicted_time仅用于记录,不影响下次预测的触发
         - 下次预测只由actual_time的变化触发
+        
+        示例流程:
+        1. RO1-一段触发 → 预测RO1 → 只处理RO1-一段 → RO1-二段丢弃
+        2. 稍后RO1-二段触发 → 预测RO1 → 只处理RO1-二段 → RO1-一段丢弃(已处理过)
         """
         try:
             logger.info(f"[{unit_name}] 开始预测 (基于 {start_date_str})")
             
-            # 步骤1:调用预测逻辑
-            result_df = run_cip_analysis(strategy=3, start_date=start_date_str, unit_filter=unit_name)
+            # 步骤1:提取基础机组号(用于unit_filter参数)
+            base_unit_name = self.extract_unit_base_name(unit_name)
+            
+            # 步骤2:调用预测逻辑(会预测整个机组的所有段,这是模型限制)
+            # 禁用 main() 内部的回调发送,由 smart_monitor 统一管理
+            logger.info(f"[{unit_name}] 执行预测 (unit_filter={base_unit_name},会预测整个机组)")
+            result_df = run_cip_analysis(
+                strategy=3, 
+                start_date=start_date_str, 
+                unit_filter=base_unit_name,
+                separate_stages=True,  # 明确指定分段输出
+                send_callback=False  # 禁用内部回调,避免重复发送
+            )
             
             if result_df.empty:
                 logger.warning(f"[{unit_name}] 预测失败: 无结果")
                 return False
             
-            # 步骤2:提取该机组的结果
+            # 显示预测得到的所有段
+            all_predicted_stages = result_df['机组类型'].tolist()
+            logger.info(f"[{unit_name}] 预测完成,得到 {len(all_predicted_stages)} 个段: {all_predicted_stages}")
+            
+            # 步骤3:只提取触发段的结果(关键:只处理触发的段!)
+            logger.info(f"[{unit_name}] 只提取触发段 '{unit_name}' 的结果,其他段丢弃")
             unit_result = result_df[result_df['机组类型'] == unit_name]
             if unit_result.empty:
-                logger.warning(f"[{unit_name}] 预测失败: 结果中无此机组")
+                logger.warning(f"[{unit_name}] 预测失败: 结果中无此机组段")
+                logger.info(f"[{unit_name}] 可用的机组类型: {result_df['机组类型'].tolist()}")
                 return False
             
             cip_time = unit_result.iloc[0]['CIP时机']
@@ -284,11 +348,12 @@ class SmartCIPMonitor:
                 logger.warning(f"[{unit_name}] 预测失败: CIP时机为空")
                 return False
             
-            # 步骤3:保存预测时间到配置文件(仅用于记录
+            # 步骤4:只保存触发段的预测时间到配置文件(其他段不保存
             predicted_time_str = cip_time.strftime('%Y-%m-%d %H:%M:%S')
+            logger.info(f"[{unit_name}] 只保存触发段的 predicted_time: {predicted_time_str}")
             
             if unit_name not in self.config['cip_times']:
-                # 配置中不存在该机组,创建新记录
+                # 配置中不存在该机组,创建新记录
                 self.config['cip_times'][unit_name] = {
                     'actual_time': start_date_str,
                     'predicted_time': predicted_time_str
@@ -311,13 +376,19 @@ class SmartCIPMonitor:
             
             logger.info(f"[{unit_name}] 预测成功 → {predicted_time_str}")
             
-            # 步骤4:发送预测结果到回调接口
-            send_decision_to_callback(result_df)
+            # 步骤5:只发送触发段的回调(其他段不发送)
+            logger.info(f"[{unit_name}] 只发送触发段的回调")
+            send_decision_to_callback(unit_result)
+            
+            # 明确说明哪些段被丢弃了
+            other_stages = [s for s in all_predicted_stages if s != unit_name]
+            if other_stages:
+                logger.info(f"[{unit_name}] 其他段的结果已丢弃,等它们自己触发后再处理: {other_stages}")
             
             return True
                 
         except Exception as e:
-            logger.error(f"[{unit_name}] 预测失败: {e}")
+            logger.error(f"[{unit_name}] 预测失败: {e}", exc_info=True)
             return False
     
     def process_changes(self, changes):
@@ -341,7 +412,7 @@ class SmartCIPMonitor:
             # 执行预测
             success = self.predict_and_update_unit(unit_name, new_actual_time)
             
-            # ⚠️ 关键修复:无论预测成功还是失败,都要更新监控基准,避免无限循环
+            # 无论预测成功还是失败,都要更新监控基准,避免无限循环
             self.last_seen_actual_times[unit_name] = new_actual_time
             
             if not success: