|
@@ -17,6 +17,8 @@ CIP时机选择策略:
|
|
|
|
|
|
|
|
import pandas as pd
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
import numpy as np
|
|
|
|
|
+import logging
|
|
|
|
|
+from logging.handlers import RotatingFileHandler
|
|
|
from sklearn.linear_model import LinearRegression
|
|
from sklearn.linear_model import LinearRegression
|
|
|
from fouling_model_0922.predict import Predictor
|
|
from fouling_model_0922.predict import Predictor
|
|
|
import warnings
|
|
import warnings
|
|
@@ -29,6 +31,28 @@ import os
|
|
|
|
|
|
|
|
warnings.filterwarnings('ignore', category=FutureWarning)
|
|
warnings.filterwarnings('ignore', category=FutureWarning)
|
|
|
|
|
|
|
|
|
|
+# 日志系统配置
|
|
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
|
|
+logger.setLevel(logging.INFO)
|
|
|
|
|
+
|
|
|
|
|
+# 日志输出格式
|
|
|
|
|
+formatter = logging.Formatter(
|
|
|
|
|
+ '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
|
|
|
|
|
+ datefmt='%Y-%m-%d %H:%M:%S'
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+# 文件日志处理器,单个文件最大5MB,保留3个备份
|
|
|
|
|
+file_handler = RotatingFileHandler('main_simple.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
|
|
|
|
|
+file_handler.setFormatter(formatter)
|
|
|
|
|
+
|
|
|
|
|
+# 控制台日志处理器
|
|
|
|
|
+console_handler = logging.StreamHandler()
|
|
|
|
|
+console_handler.setFormatter(formatter)
|
|
|
|
|
+
|
|
|
|
|
+# 添加处理器
|
|
|
|
|
+logger.addHandler(file_handler)
|
|
|
|
|
+logger.addHandler(console_handler)
|
|
|
|
|
+
|
|
|
# 加载配置文件
|
|
# 加载配置文件
|
|
|
def load_config():
|
|
def load_config():
|
|
|
"""
|
|
"""
|
|
@@ -41,10 +65,9 @@ def load_config():
|
|
|
try:
|
|
try:
|
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
|
config = json.load(f)
|
|
config = json.load(f)
|
|
|
- print(f"配置文件加载成功: {config_path}")
|
|
|
|
|
return config
|
|
return config
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- print(f"配置文件加载失败: {e}")
|
|
|
|
|
|
|
+ logger.error(f"配置文件加载失败: {e}")
|
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
# 加载配置
|
|
# 加载配置
|
|
@@ -101,18 +124,13 @@ def update_cip_history_in_config(result_df):
|
|
|
with open(config_path, 'w', encoding='utf-8') as f:
|
|
with open(config_path, 'w', encoding='utf-8') as f:
|
|
|
json.dump(current_config, f, ensure_ascii=False, indent=2)
|
|
json.dump(current_config, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
|
|
- print(f"CIP预测时间已保存:")
|
|
|
|
|
- for update in updated_units:
|
|
|
|
|
- print(f" {update}")
|
|
|
|
|
-
|
|
|
|
|
config = current_config
|
|
config = current_config
|
|
|
return True
|
|
return True
|
|
|
else:
|
|
else:
|
|
|
- print("无CIP时间需要保存")
|
|
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- print(f"保存CIP预测时间失败: {e}")
|
|
|
|
|
|
|
+ logger.error(f"保存CIP预测时间失败: {e}")
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
def validate_data(data, name="数据"):
|
|
def validate_data(data, name="数据"):
|
|
@@ -168,7 +186,6 @@ class OptimalCIPPredictor:
|
|
|
self.window_hours = window_days * 24 # 转换为小时
|
|
self.window_hours = window_days * 24 # 转换为小时
|
|
|
self.min_continuous_rising = min_continuous_rising
|
|
self.min_continuous_rising = min_continuous_rising
|
|
|
self.min_delay_days = min_delay_days
|
|
self.min_delay_days = min_delay_days
|
|
|
- print(f"预测器初始化: 窗口={window_days}天, 连续上升>={min_continuous_rising}点, 延迟>={min_delay_days}天")
|
|
|
|
|
|
|
|
|
|
def calculate_sliding_k_values(self, pressure_series):
|
|
def calculate_sliding_k_values(self, pressure_series):
|
|
|
"""
|
|
"""
|
|
@@ -444,18 +461,17 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
|
|
|
pd.DataFrame: 包含机组类型、CIP时机、策略说明的结果表
|
|
pd.DataFrame: 包含机组类型、CIP时机、策略说明的结果表
|
|
|
"""
|
|
"""
|
|
|
# 初始化日志记录器
|
|
# 初始化日志记录器
|
|
|
- logger = CIPAnalysisLogger()
|
|
|
|
|
|
|
+ analysis_logger = CIPAnalysisLogger()
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
# 获取预测数据
|
|
# 获取预测数据
|
|
|
- print("获取预测数据...")
|
|
|
|
|
try:
|
|
try:
|
|
|
all_data = Predictor().predict(start_date=start_date) # 获取预测数据
|
|
all_data = Predictor().predict(start_date=start_date) # 获取预测数据
|
|
|
if all_data.empty:
|
|
if all_data.empty:
|
|
|
- logger.logger.error("预测数据为空")
|
|
|
|
|
|
|
+ analysis_logger.logger.error("预测数据为空")
|
|
|
return pd.DataFrame()
|
|
return pd.DataFrame()
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- logger.logger.error(f"获取预测数据失败: {e}")
|
|
|
|
|
|
|
+ analysis_logger.logger.error(f"获取预测数据失败: {e}")
|
|
|
return pd.DataFrame()
|
|
return pd.DataFrame()
|
|
|
|
|
|
|
|
# 将date列设置为索引
|
|
# 将date列设置为索引
|
|
@@ -463,23 +479,11 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
|
|
|
|
|
|
|
|
# 获取预测数据的起始时间
|
|
# 获取预测数据的起始时间
|
|
|
prediction_start_date = all_data.index[0].to_pydatetime()
|
|
prediction_start_date = all_data.index[0].to_pydatetime()
|
|
|
- print(f"预测数据起始时间: {prediction_start_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
|
|
|
+ logger.info(f"预测起始: {prediction_start_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
|
|
|
|
# 记录输入参数和预测数据
|
|
# 记录输入参数和预测数据
|
|
|
- logger.log_input_parameters(strategy, start_date, prediction_start_date)
|
|
|
|
|
- logger.log_prediction_data(all_data)
|
|
|
|
|
-
|
|
|
|
|
- # 显示配置的CIP时间状态
|
|
|
|
|
- if config and 'cip_times' in config:
|
|
|
|
|
- print(f"\n当前配置的CIP时间状态:")
|
|
|
|
|
- for unit_name, cip_data in config['cip_times'].items():
|
|
|
|
|
- if not unit_name.startswith('_'):
|
|
|
|
|
- if isinstance(cip_data, dict):
|
|
|
|
|
- actual = cip_data.get('actual_time', 'N/A')
|
|
|
|
|
- predicted = cip_data.get('predicted_time', 'N/A')
|
|
|
|
|
- print(f" {unit_name}: 实际={actual}, 预测={predicted}")
|
|
|
|
|
- else:
|
|
|
|
|
- print(f" {unit_name}: {cip_data}")
|
|
|
|
|
|
|
+ analysis_logger.log_input_parameters(strategy, start_date, prediction_start_date)
|
|
|
|
|
+ analysis_logger.log_prediction_data(all_data)
|
|
|
|
|
|
|
|
# 确定要分析的机组
|
|
# 确定要分析的机组
|
|
|
if unit_filter:
|
|
if unit_filter:
|
|
@@ -494,32 +498,21 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
|
|
|
unit_days_dict[unit_id] = get_unit_days(unit_id, prediction_start_date)
|
|
unit_days_dict[unit_id] = get_unit_days(unit_id, prediction_start_date)
|
|
|
|
|
|
|
|
# 记录机组预测天数
|
|
# 记录机组预测天数
|
|
|
- logger.log_unit_days(unit_days_dict)
|
|
|
|
|
|
|
+ analysis_logger.log_unit_days(unit_days_dict)
|
|
|
|
|
|
|
|
# 初始化预测器
|
|
# 初始化预测器
|
|
|
predictor = OptimalCIPPredictor(window_days=7, min_continuous_rising=3, min_delay_days=30)
|
|
predictor = OptimalCIPPredictor(window_days=7, min_continuous_rising=3, min_delay_days=30)
|
|
|
|
|
|
|
|
- # 策略说明
|
|
|
|
|
- strategy_names = {
|
|
|
|
|
- 1: "最早时机策略",
|
|
|
|
|
- 2: "最晚时机策略",
|
|
|
|
|
- 3: "加权平均策略",
|
|
|
|
|
- 4: "污染严重程度策略"
|
|
|
|
|
- }
|
|
|
|
|
- print(f"\n使用策略: {strategy_names.get(strategy, '未知策略')}")
|
|
|
|
|
-
|
|
|
|
|
# 存储分析结果
|
|
# 存储分析结果
|
|
|
results = []
|
|
results = []
|
|
|
|
|
|
|
|
# 遍历分析各机组
|
|
# 遍历分析各机组
|
|
|
for unit_id in unit_ids:
|
|
for unit_id in unit_ids:
|
|
|
- print(f"\n分析机组 RO{unit_id}")
|
|
|
|
|
-
|
|
|
|
|
# 获取该机组的预测天数
|
|
# 获取该机组的预测天数
|
|
|
predict_days = unit_days_dict[unit_id]
|
|
predict_days = unit_days_dict[unit_id]
|
|
|
|
|
|
|
|
# 记录分析开始
|
|
# 记录分析开始
|
|
|
- logger.log_unit_analysis_start(unit_id, predict_days)
|
|
|
|
|
|
|
+ analysis_logger.log_unit_analysis_start(unit_id, predict_days)
|
|
|
|
|
|
|
|
# 截取预测天数范围内的数据
|
|
# 截取预测天数范围内的数据
|
|
|
end_time = all_data.index[0] + timedelta(days=predict_days)
|
|
end_time = all_data.index[0] + timedelta(days=predict_days)
|
|
@@ -530,13 +523,11 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
|
|
|
pressure_columns = [col for col in truncated_data.columns if ro_name in col and 'DPT' in col and 'pred' in col]
|
|
pressure_columns = [col for col in truncated_data.columns if ro_name in col and 'DPT' in col and 'pred' in col]
|
|
|
|
|
|
|
|
if not pressure_columns:
|
|
if not pressure_columns:
|
|
|
- print(f"警告: 未找到{ro_name}的压差列")
|
|
|
|
|
|
|
+ logger.warning(f"[{ro_name}] 未找到压差列")
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- print(f"找到{ro_name}压差列: {len(pressure_columns)}个")
|
|
|
|
|
-
|
|
|
|
|
# 记录压差数据
|
|
# 记录压差数据
|
|
|
- logger.log_unit_pressure_data(unit_id, truncated_data, pressure_columns)
|
|
|
|
|
|
|
+ analysis_logger.log_unit_pressure_data(unit_id, truncated_data, pressure_columns)
|
|
|
|
|
|
|
|
# 收集各段的CIP分析结果
|
|
# 收集各段的CIP分析结果
|
|
|
cip_results = []
|
|
cip_results = []
|
|
@@ -554,7 +545,7 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
|
|
|
optimal_time, analysis = predictor.find_optimal_cip_time(pressure_series)
|
|
optimal_time, analysis = predictor.find_optimal_cip_time(pressure_series)
|
|
|
|
|
|
|
|
# 记录分析结果
|
|
# 记录分析结果
|
|
|
- logger.log_cip_analysis_result(unit_id, column, optimal_time, analysis)
|
|
|
|
|
|
|
+ analysis_logger.log_cip_analysis_result(unit_id, column, optimal_time, analysis)
|
|
|
|
|
|
|
|
if optimal_time:
|
|
if optimal_time:
|
|
|
cip_results.append({
|
|
cip_results.append({
|
|
@@ -563,13 +554,9 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
|
|
|
'delay_days': analysis['delay_days'],
|
|
'delay_days': analysis['delay_days'],
|
|
|
'k_value': analysis['best_k']
|
|
'k_value': analysis['best_k']
|
|
|
})
|
|
})
|
|
|
- print(f" {column}: {optimal_time} (第{analysis['delay_days']}天, k={analysis['best_k']:.6f})")
|
|
|
|
|
- else:
|
|
|
|
|
- print(f" {column}: 未找到CIP时机 - {analysis.get('error', '未知原因')}")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- print(f" {column}: 分析失败 - {str(e)}")
|
|
|
|
|
- logger.log_cip_analysis_result(unit_id, column, None, {"error": str(e)})
|
|
|
|
|
|
|
+ analysis_logger.log_cip_analysis_result(unit_id, column, None, {"error": str(e)})
|
|
|
|
|
|
|
|
# 根据策略选择最优CIP时机
|
|
# 根据策略选择最优CIP时机
|
|
|
if cip_results:
|
|
if cip_results:
|
|
@@ -579,42 +566,36 @@ def analyze_ro_unit_cip_timing(strategy=1, start_date=None, unit_filter=None):
|
|
|
'CIP时机': optimal_time,
|
|
'CIP时机': optimal_time,
|
|
|
'策略说明': strategy_desc
|
|
'策略说明': strategy_desc
|
|
|
})
|
|
})
|
|
|
- print(f"RO{unit_id}最优CIP时机: {optimal_time}")
|
|
|
|
|
- print(f"策略: {strategy_desc}")
|
|
|
|
|
|
|
+ logger.info(f"[RO{unit_id}] CIP时机 → {optimal_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
|
|
|
|
- logger.log_unit_strategy_result(unit_id, optimal_time, strategy_desc)
|
|
|
|
|
|
|
+ analysis_logger.log_unit_strategy_result(unit_id, optimal_time, strategy_desc)
|
|
|
else:
|
|
else:
|
|
|
results.append({
|
|
results.append({
|
|
|
'机组类型': f"RO{unit_id}",
|
|
'机组类型': f"RO{unit_id}",
|
|
|
'CIP时机': None,
|
|
'CIP时机': None,
|
|
|
'策略说明': "无有效CIP时机"
|
|
'策略说明': "无有效CIP时机"
|
|
|
})
|
|
})
|
|
|
- print(f"RO{unit_id}: 未找到有效CIP时机")
|
|
|
|
|
|
|
+ logger.warning(f"[RO{unit_id}] 未找到有效CIP时机")
|
|
|
|
|
|
|
|
- logger.log_unit_strategy_result(unit_id, None, "无有效CIP时机")
|
|
|
|
|
|
|
+ analysis_logger.log_unit_strategy_result(unit_id, None, "无有效CIP时机")
|
|
|
|
|
|
|
|
# 生成结果DataFrame
|
|
# 生成结果DataFrame
|
|
|
result_df = pd.DataFrame(results)
|
|
result_df = pd.DataFrame(results)
|
|
|
|
|
|
|
|
# 记录最终结果
|
|
# 记录最终结果
|
|
|
- logger.log_final_results(result_df)
|
|
|
|
|
|
|
+ analysis_logger.log_final_results(result_df)
|
|
|
|
|
|
|
|
# 生成分析图表
|
|
# 生成分析图表
|
|
|
- logger.create_analysis_plots(all_data, unit_days_dict)
|
|
|
|
|
-
|
|
|
|
|
- print("\n" + "="*50)
|
|
|
|
|
- print("分析完成")
|
|
|
|
|
- print("="*50)
|
|
|
|
|
- print(result_df.to_string(index=False))
|
|
|
|
|
|
|
+ analysis_logger.create_analysis_plots(all_data, unit_days_dict)
|
|
|
|
|
|
|
|
return result_df
|
|
return result_df
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- logger.logger.error(f"分析过程中发生错误: {e}")
|
|
|
|
|
|
|
+ analysis_logger.logger.error(f"分析过程中发生错误: {e}")
|
|
|
raise
|
|
raise
|
|
|
finally:
|
|
finally:
|
|
|
# 确保日志记录器正确关闭
|
|
# 确保日志记录器正确关闭
|
|
|
- logger.close()
|
|
|
|
|
|
|
+ analysis_logger.close()
|
|
|
|
|
|
|
|
def main(strategy=3, start_date=None, unit_filter=None):
|
|
def main(strategy=3, start_date=None, unit_filter=None):
|
|
|
"""
|
|
"""
|
|
@@ -639,20 +620,14 @@ def main(strategy=3, start_date=None, unit_filter=None):
|
|
|
result_df = main(start_date='2025-07-01 00:00:00') # 指定时间
|
|
result_df = main(start_date='2025-07-01 00:00:00') # 指定时间
|
|
|
result_df = main(strategy=1, unit_filter='RO1') # 指定策略和机组
|
|
result_df = main(strategy=1, unit_filter='RO1') # 指定策略和机组
|
|
|
"""
|
|
"""
|
|
|
- print("RO膜污染监控与CIP预测")
|
|
|
|
|
- print("=" * 50)
|
|
|
|
|
-
|
|
|
|
|
# 执行分析
|
|
# 执行分析
|
|
|
result_df = analyze_ro_unit_cip_timing(strategy=strategy, start_date=start_date, unit_filter=unit_filter)
|
|
result_df = analyze_ro_unit_cip_timing(strategy=strategy, start_date=start_date, unit_filter=unit_filter)
|
|
|
|
|
|
|
|
# 发送回调
|
|
# 发送回调
|
|
|
if config and not result_df.empty:
|
|
if config and not result_df.empty:
|
|
|
- print("\n发送决策结果...")
|
|
|
|
|
callback_success = send_decision_to_callback(result_df)
|
|
callback_success = send_decision_to_callback(result_df)
|
|
|
- if callback_success:
|
|
|
|
|
- print("决策结果发送成功")
|
|
|
|
|
- else:
|
|
|
|
|
- print("决策结果发送失败")
|
|
|
|
|
|
|
+ if not callback_success:
|
|
|
|
|
+ logger.error("决策结果发送失败")
|
|
|
|
|
|
|
|
return result_df
|
|
return result_df
|
|
|
|
|
|
|
@@ -669,7 +644,7 @@ def send_decision_to_callback(decision_data):
|
|
|
bool: 发送成功返回True,失败返回False
|
|
bool: 发送成功返回True,失败返回False
|
|
|
"""
|
|
"""
|
|
|
if config is None:
|
|
if config is None:
|
|
|
- print("配置文件未加载")
|
|
|
|
|
|
|
+ logger.error("配置文件未加载")
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
@@ -703,36 +678,29 @@ def send_decision_to_callback(decision_data):
|
|
|
"list": callback_list
|
|
"list": callback_list
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- # 日志:显示待发送数据
|
|
|
|
|
- log_type = callback_list[0]["type"] if callback_list else "UNKNOWN"
|
|
|
|
|
- print(f"[{log_type}] 准备发送决策数据:")
|
|
|
|
|
- print(f"{json.dumps(payload, indent=2, ensure_ascii=False)}")
|
|
|
|
|
-
|
|
|
|
|
# 发送HTTP请求(带重试机制)
|
|
# 发送HTTP请求(带重试机制)
|
|
|
max_retries = 3
|
|
max_retries = 3
|
|
|
retry_interval = 10
|
|
retry_interval = 10
|
|
|
|
|
+ log_type = callback_list[0]["type"] if callback_list else "UNKNOWN"
|
|
|
|
|
|
|
|
for attempt in range(1, max_retries + 1):
|
|
for attempt in range(1, max_retries + 1):
|
|
|
try:
|
|
try:
|
|
|
- print(f"[{log_type}] 第{attempt}/{max_retries}次尝试发送...")
|
|
|
|
|
response = requests.post(callback_url, headers=headers, json=payload, timeout=15)
|
|
response = requests.post(callback_url, headers=headers, json=payload, timeout=15)
|
|
|
response.raise_for_status()
|
|
response.raise_for_status()
|
|
|
|
|
|
|
|
- print(f"[{log_type}] 决策数据发送成功,服务器响应: {response.text}")
|
|
|
|
|
|
|
+ logger.info(f"[{log_type}] 回调成功")
|
|
|
return True
|
|
return True
|
|
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
except requests.exceptions.RequestException as e:
|
|
|
- print(f"[{log_type}] 发送失败: {e}")
|
|
|
|
|
-
|
|
|
|
|
if attempt < max_retries:
|
|
if attempt < max_retries:
|
|
|
- print(f"[{log_type}] {retry_interval}秒后重试...")
|
|
|
|
|
time.sleep(retry_interval)
|
|
time.sleep(retry_interval)
|
|
|
|
|
+ else:
|
|
|
|
|
+ logger.error(f"[{log_type}] 回调失败(重试{max_retries}次): {e}")
|
|
|
|
|
|
|
|
- print(f"[{log_type}] 所有重试均失败")
|
|
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- print(f"构建回调数据时出错: {e}")
|
|
|
|
|
|
|
+ logger.error(f"构建回调数据时出错: {e}")
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|