# 异常检测模块 (Anomaly Detection) > **版本**: 1.0.0 > **最后更新**: 2025-11-04 > **功能**: 对双膜系统(UF超滤 + RO反渗透)关键运行指标进行实时异常检测 --- ## 模块概述 异常检测模块用于监测双膜水处理系统的关键运行指标,及时发现设备异常、性能退化等问题。 ### 主要特性 - **多模型融合**: 支持孤立森林(Isolation Forest)、3σ统计、One-Class SVM三种检测方法 - **逐列检测**: 对每个指标独立建模,精准定位异常来源 - **无需标注**: 基于无监督学习,不需要人工标注异常样本 - **实时检测**: 支持在线预测,快速识别异常点 - **批量处理**: 自动读取和合并多批次历史数据 ### 应用场景 1. **设备异常预警**: 膜组件堵塞、泵压异常、流量突变 2. **性能监控**: 膜渗透率下降、产水量异常 3. **维护决策**: 基于异常频率制定清洗/更换计划 4. **质量控制**: 确保系统稳定运行 --- ## 目录结构 ``` models/anomaly_detection/ ├── detection.py # 主程序(训练+预测) ├── README.md # 本文档 ├── scaler.pkl # 数据归一化器(训练产物) ├── isolation_forest_models.pkl # 孤立森林模型(训练产物) ├── three_sigma_model.pkl # 3σ统计模型(训练产物) └── datasets_export_xishan/ # 数据目录(需自行准备) ├── data_export5_1.csv # UF渗透率数据 ├── data_export8_1.csv # RO1/RO2压差数据 ├── data_export9_1.csv # RO3/RO4压差数据 ├── data_export11_1.csv # RO产水流量数据 └── ... (data_export*_2.csv ~ data_export*_26.csv) ``` --- ## 核心功能 ### 1. 数据加载与合并 - 批量读取 26 批次历史数据(`data_export*_1.csv ~ data_export*_26.csv`) - 自动提取关键指标列并纵向合并 - 容错处理:文件缺失时跳过并记录日志 ### 2. 数据预处理 - **MinMax归一化**: 将所有指标缩放至 [0, 1] 区间 - **逐列处理**: 每个指标独立归一化,避免量纲影响 - **保存scaler**: 归一化器保存为 `scaler.pkl`,用于在线预测 ### 3. 模型训练 #### 孤立森林 (Isolation Forest) - 基于树结构随机分割,异常点更容易被孤立 - 参数:`n_estimators=100`, `contamination='auto'` - 适用场景:单变量离群点检测 #### 3σ统计法 (Three Sigma) - 基于正态分布假设,超出 μ±3σ 视为异常 - 参数:`n_sigma=3`(可调) - 适用场景:稳态数据、可解释性要求高 #### One-Class SVM (可选) - 基于核函数学习正常数据边界 - 参数:`nu=0.05`, `kernel='rbf'` - 适用场景:复杂边界、非线性分布 ### 4. 异常预测 - 输入归一化后的数据 - 输出预测标签:`-1=异常`, `1=正常` - 支持批量预测和实时检测 --- ## 监测指标 ### UF(超滤)指标 | 指标名称 | 含义 | 单位 | 正常范围 | |---------|------|------|---------| | UF1Per | UF1膜渗透率 | - | 根据历史数据确定 | | UF2Per | UF2膜渗透率 | - | 根据历史数据确定 | | UF3Per | UF3膜渗透率 | - | 根据历史数据确定 | | UF4Per | UF4膜渗透率 | - | 根据历史数据确定 | ### RO(反渗透)指标 | 指标名称 | 含义 | 单位 | 异常情况 | |---------|------|------|---------| | C.M.RO1_DB@DPT_1 | RO1一段压差 | kPa | 压差过高→膜污堵 | | C.M.RO1_DB@DPT_2 | RO1二段压差 | kPa | 压差过高→膜污堵 | | C.M.RO2_DB@DPT_1 | RO2一段压差 | kPa | 压差过高→膜污堵 | | C.M.RO2_DB@DPT_2 | RO2二段压差 | kPa | 压差过高→膜污堵 | | C.M.RO3_DB@DPT_1 | RO3一段压差 | kPa | 压差过高→膜污堵 | | C.M.RO3_DB@DPT_2 | RO3二段压差 | kPa | 压差过高→膜污堵 | | C.M.RO4_DB@DPT_1 | RO4一段压差 | kPa | 压差过高→膜污堵 | | C.M.RO4_DB@DPT_2 | RO4二段压差 | kPa | 压差过高→膜污堵 | | RO1_CSFlow | RO1产水流量 | m³/h | 流量异常→泵/阀问题 | | RO2_CSFlow | RO2产水流量 | m³/h | 流量异常→泵/阀问题 | | RO3_CSFlow | RO3产水流量 | m³/h | 流量异常→泵/阀问题 | | RO4_CSFlow | RO4产水流量 | m³/h | 流量异常→泵/阀问题 | **共16个监测指标** --- ## 技术架构 ### 整体流程图 ```mermaid graph TB subgraph 数据加载阶段 A[开始] --> B1[读取data_export5_*.csv] A --> B2[读取data_export8_*.csv] A --> B3[读取data_export9_*.csv] A --> B4[读取data_export11_*.csv] B1 --> C[提取UF1Per/UF2Per/UF3Per/UF4Per] B2 --> D[提取RO1/RO2的DPT_1和DPT_2] B3 --> E[提取RO3/RO4的DPT_1和DPT_2] B4 --> F[提取RO1~RO4的CSFlow] C --> G[纵向合并所有批次数据] D --> G E --> G F --> G end subgraph 数据预处理阶段 G --> H[DataFrame: 16个监测指标列] H --> I[MinMaxScaler归一化到0-1区间] I --> J[保存scaler.pkl] end subgraph 模型训练阶段 J --> K{逐列训练} K --> L1[孤立森林训练] K --> L2[3σ统计计算] K --> L3[One-Class SVM训练可选] L1 --> M1[遍历16个指标列] M1 --> N1[每列fit一个IsolationForest模型] N1 --> O1[保存isolation_forest_models.pkl] L2 --> M2[遍历16个指标列] M2 --> N2[每列计算mean和std] N2 --> O2[保存three_sigma_model.pkl] L3 --> M3[遍历16个指标列] M3 --> N3[每列fit一个OneClassSVM模型] N3 --> O3[保存one_class_svm_models.pkl] end subgraph 异常检测阶段 O1 --> P[加载新数据] O2 --> P O3 --> P P --> Q[使用scaler归一化新数据] Q --> R{选择模型预测} R --> S1[孤立森林预测] R --> S2[3σ预测] R --> S3[One-Class SVM预测] S1 --> T[结果融合可选] S2 --> T S3 --> T T --> U[输出预测结果: -1异常/1正常] U --> V[结束] end ``` ### 详细训练与预测流程 ```mermaid sequenceDiagram participant User as 用户 participant Script as detection.py participant Data as 数据文件 participant Model as 模型 participant Result as 预测结果 Note over User,Result: 训练阶段 User->>Script: 运行python detection.py Script->>Data: 批量读取26批次CSV Data-->>Script: 返回DataFrame(N行×16列) Script->>Script: MinMaxScaler归一化 Script->>Script: 保存scaler.pkl loop 对每个指标列 Script->>Model: 训练IsolationForest Script->>Model: 计算3σ统计量 Script->>Model: 训练OneClassSVM(可选) end Script->>Script: 保存所有模型.pkl Script-->>User: 训练完成 Note over User,Result: 预测阶段 User->>Script: 提供新数据DataFrame Script->>Script: 加载scaler.pkl和模型 Script->>Script: 归一化新数据 loop 对每个指标列 Script->>Model: 调用predict() Model-->>Script: 返回-1或1 end Script->>Result: 生成预测DataFrame Result-->>User: 返回异常检测结果 ``` ### 关键技术 - **sklearn.ensemble.IsolationForest**: 孤立森林实现 - **sklearn.svm.OneClassSVM**: 单类SVM实现 - **sklearn.preprocessing.MinMaxScaler**: 数据归一化 - **pandas**: 数据处理 - **joblib**: 模型序列化 --- ## 数据来源 ### 文件命名规则 | 文件模板 | 包含指标 | 数量 | |---------|---------|------| | `data_export5_{i}.csv` | UF1Per, UF2Per, UF3Per, UF4Per | 4列 | | `data_export8_{i}.csv` | RO1/RO2的DPT_1和DPT_2 | 4列 | | `data_export9_{i}.csv` | RO3/RO4的DPT_1和DPT_2 | 4列 | | `data_export11_{i}.csv` | RO1~RO4的CSFlow | 4列 | 其中 `{i}` 为批次号,范围 `1~26` ### 数据格式要求 ```csv UF1Per,UF2Per,UF3Per,UF4Per 0.85,0.87,0.83,0.86 0.84,0.86,0.82,0.85 ... ``` - CSV格式,逗号分隔 - 第一行为列名(必须与代码中定义的列名一致) - 数值型数据,缺失值用空或NaN表示 --- ## 使用指南 ### 1. 环境准备 ```bash # 安装依赖 pip install numpy pandas scikit-learn joblib matplotlib # 确认目录结构 cd models/anomaly_detection/ ls datasets_export_xishan/ # 确认数据文件存在 ``` ### 2. 训练模型 ```bash # 运行主程序 python detection.py ``` **输出示例**: ``` 开始加载数据... 成功读取: data_export5_1.csv 成功读取: data_export8_1.csv ... 数据合并完成,总样本数: 125680 开始数据归一化... 归一化器已保存为 scaler.pkl 开始训练孤立森林模型... 已训练 UF1Per 的孤立森林模型 已训练 UF2Per 的孤立森林模型 ... 孤立森林模型已保存为 isolation_forest_models.pkl 开始训练3σ模型... 已计算 UF1Per 的3σ统计量 ... 3σ模型已保存为 three_sigma_model.pkl 所有模型训练和保存完成! ``` ### 3. 使用模型预测(示例代码) ```python import pandas as pd import joblib # 加载模型和归一化器 scaler = joblib.load("scaler.pkl") if_model = joblib.load("isolation_forest_models.pkl") ts_model = joblib.load("three_sigma_model.pkl") # 准备新数据(需包含所有16个指标列) new_data = pd.DataFrame({ 'UF1Per': [0.85, 0.82], 'UF2Per': [0.87, 0.84], # ... 其他14个指标 }) # 归一化 normalized_data = scaler.transform(new_data) normalized_df = pd.DataFrame(normalized_data, columns=new_data.columns) # 孤立森林预测 if_predictions = if_model.predict(normalized_df) print("孤立森林预测:", if_predictions) # 3σ预测 ts_predictions = ts_model.predict(normalized_df, n_sigma=3) print("3σ预测:", ts_predictions) # 结果融合(投票法) # -1=异常, 1=正常 final_predictions = (if_predictions + ts_predictions) // 2 ``` --- ## 模型说明 ### 1. 孤立森林 (Isolation Forest) **原理**: - 通过随机选择特征和分割点构建多棵树 - 异常点更容易被孤立(需要更少的分割次数) - 路径长度越短,异常性越高 **优点**: - 无需标注数据 - 对高维数据有效 - 计算效率高 **缺点**: - 对正常数据密度不均匀的情况敏感 - contamination参数需要先验知识 **参数说明**: | 参数 | 值 | 说明 | |-----|-----|------| | n_estimators | 100 | 树的数量 | | contamination | 'auto' | 异常比例(自动估计) | | random_state | 42 | 随机种子 | ### 2. 3σ统计法 (Three Sigma) **原理**: - 假设数据服从正态分布 - 计算均值μ和标准差σ - 超出 [μ-3σ, μ+3σ] 范围视为异常(覆盖99.7%正常数据) **优点**: - 简单直观,易于理解和解释 - 计算速度快 - 可调整灵敏度(修改n_sigma) **缺点**: - 假设数据正态分布(偏态分布效果差) - 对极端异常值敏感 **参数说明**: | 参数 | 值 | 说明 | |-----|-----|------| | n_sigma | 3 | 阈值系数(推荐2~4) | ### 3. One-Class SVM (可选) **原理**: - 在高维空间寻找包含正常数据的最小超球面 - 核函数将数据映射到高维空间 - 边界外的点视为异常 **优点**: - 适合复杂非线性边界 - 理论基础扎实 **缺点**: - 计算复杂度高(大数据集慢) - 参数调优困难(nu、gamma) - 对数据尺度敏感 **参数说明**: | 参数 | 值 | 说明 | |-----|-----|------| | nu | 0.05 | 异常比例上界(5%) | | kernel | 'rbf' | 径向基核函数 | | gamma | 'scale' | 核系数(自动) | **启用方法**: 1. 在 `detection.py` 中取消 `OneClassSVMModel` 类的注释(第166-203行) 2. 取消 `main()` 函数中相关代码的注释(第227-250行) --- ## 配置参数 ### 数据路径配置 ```python # detection.py 第14行 data_folder = "datasets_export_xishan" ``` ### 文件模板配置 ```python # detection.py 第22-29行 file_info = { "data_export5_{}.csv": ["UF1Per", "UF2Per", "UF3Per", "UF4Per"], "data_export8_{}.csv": ["C.M.RO1_DB@DPT_1", "C.M.RO1_DB@DPT_2", "C.M.RO2_DB@DPT_1", "C.M.RO2_DB@DPT_2"], "data_export9_{}.csv": ["C.M.RO3_DB@DPT_1", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_1", "C.M.RO4_DB@DPT_2"], "data_export11_{}.csv": ["RO1_CSFlow", "RO2_CSFlow", "RO3_CSFlow", "RO4_CSFlow"] } ``` ### 模型参数调整 **孤立森林**: ```python # detection.py 第98行 model = IsolationForest( n_estimators=100, # 增大→更稳定,但训练慢 contamination='auto', # 或设置具体值如0.05 random_state=42 ) ``` **3σ统计**: ```python # 预测时调整 ts_predictions = ts_model.predict(normalized_df, n_sigma=3) # n_sigma=2 → 更敏感(95%覆盖) # n_sigma=4 → 更宽松(99.99%覆盖) ``` --- ## 输出结果 ### 1. 训练产物 | 文件名 | 大小 | 说明 | |--------|------|------| | scaler.pkl | ~2KB | MinMax归一化器 | | isolation_forest_models.pkl | ~500KB | 孤立森林模型(16个) | | three_sigma_model.pkl | ~1KB | 3σ统计量(16个指标的μ和σ) | ### 2. 预测结果格式 **DataFrame示例**: | UF1Per | UF2Per | C.M.RO1_DB@DPT_1 | ... | |--------|--------|------------------|-----| | 1 | 1 | -1 | ... | | 1 | -1 | 1 | ... | | 1 | 1 | 1 | ... | - `1`: 正常 - `-1`: 异常 ### 3. 结果分析 #### 单指标异常统计 ```python # 统计每个指标的异常率 anomaly_rate = (predictions == -1).sum() / len(predictions) print(f"异常率: {anomaly_rate * 100:.2f}%") ``` #### 多模型融合 ```python # 投票法:两个模型都判定为异常才认为异常 consensus = ((if_pred == -1) & (ts_pred == -1)).astype(int) consensus = np.where(consensus, -1, 1) ``` #### 异常时段定位 ```python # 找出连续异常的时间段(需要时间戳列) anomaly_indices = np.where(predictions == -1)[0] print(f"异常点索引: {anomaly_indices}") ``` --- ## 常见问题 ### Q1: 读取数据时报错"未找到文件" **原因**: `datasets_export_xishan/` 目录不存在或文件命名不符合规则 **解决**: ```bash # 检查目录 ls datasets_export_xishan/ # 确认文件命名格式 # 正确: data_export5_1.csv, data_export8_2.csv # 错误: data_export5-1.csv, export5_1.csv ``` ### Q2: One-Class SVM 报错 `NameError: name 'OneClassSVMModel' is not defined` **原因**: One-Class SVM 默认被注释 **解决**: 1. 打开 `detection.py` 2. 取消第166-203行的注释(类定义) 3. 取消第227-250行的注释(训练和预测代码) ### Q3: 训练时内存不足 **原因**: 数据量过大(26批次 × 数万样本) **解决**: ```python # detection.py 第208行后添加下采样 merged_data = load_and_merge_data() # 随机采样50%数据 merged_data = merged_data.sample(frac=0.5, random_state=42) ``` ### Q4: 预测结果全是异常或全是正常 **原因**: - 数据分布与训练数据差异大 - contamination参数不合理 **解决**: ```python # 调整contamination参数 model = IsolationForest( n_estimators=100, contamination=0.05, # 假设5%为异常 random_state=42 ) ``` ### Q5: 3σ方法对某些指标效果差 **原因**: 数据不服从正态分布(偏态、双峰) **解决**: ```python # 查看数据分布 import matplotlib.pyplot as plt df['UF1Per'].hist(bins=50) plt.show() # 考虑数据变换(如log变换)或使用孤立森林 ``` ### Q6: 如何评估模型效果? **方法**: 1. **人工标注部分样本**: 构建测试集计算准确率、召回率 2. **业务反馈**: 结合实际异常事件验证 3. **多模型对比**: 比较IF、3σ、OCSVM的一致性 ```python from sklearn.metrics import classification_report # 需要人工标注的真实标签 y_true = [1, 1, -1, 1, -1, ...] y_pred = if_model.predict(test_data) print(classification_report(y_true, y_pred)) ``` ### Q7: 如何集成到在线系统? **方案**: ```python # 1. 封装为函数 def detect_anomaly(new_data): """ 参数: new_data - DataFrame,包含16个指标列 返回: predictions - DataFrame,-1=异常, 1=正常 """ scaler = joblib.load("scaler.pkl") if_model = joblib.load("isolation_forest_models.pkl") normalized = scaler.transform(new_data) normalized_df = pd.DataFrame(normalized, columns=new_data.columns) return if_model.predict(normalized_df) # 2. 构建API(FastAPI示例) from fastapi import FastAPI app = FastAPI() @app.post("/detect") def detect(data: dict): df = pd.DataFrame([data]) result = detect_anomaly(df) return {"prediction": result.tolist()} ```