import os import re import pandas as pd import numpy as np import matplotlib.pyplot as plt from matplotlib.font_manager import FontProperties # -------------------------- 彻底解决字体警告问题 -------------------------- def setup_chinese_font(): try: # 尝试加载Windows常见中文字体 font = FontProperties(fname="C:/Windows/Fonts/simhei.ttf") # 黑体 plt.rcParams["font.family"] = font.get_name() except: try: # 尝试加载macOS常见中文字体 font = FontProperties(fname="/System/Library/Fonts/PingFang.ttc") # 苹方 plt.rcParams["font.family"] = font.get_name() except: try: # 尝试加载Linux常见中文字体 font = FontProperties(fname="/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf") plt.rcParams["font.family"] = font.get_name() except: # 无中文字体时使用默认英文无衬线字体(无警告) plt.rcParams["font.family"] = ["DejaVu Sans", "sans-serif"] plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题 # 初始化字体配置(无警告) setup_chinese_font() # -------------------------- 核心配置:需画图的传感器白名单(新增4个水质传感器) -------------------------- PLOT_WHITELIST = [ # 1. 跨膜压差(4个) "C.M.UF1_DB@press_PV", "C.M.UF2_DB@press_PV", "C.M.UF3_DB@press_PV", "C.M.UF4_DB@press_PV", # 2. 一段压差(4个) "C.M.RO1_DB@DPT_1", "C.M.RO2_DB@DPT_1", "C.M.RO3_DB@DPT_1", "C.M.RO4_DB@DPT_1", # 3. 二段压差(4个) "C.M.RO1_DB@DPT_2", "C.M.RO2_DB@DPT_2", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_2", # 4. 总进水电导(1个) "C.M.RO_Cond_ZJS@out", # 5. 产水电导(4个) "C.M.RO1_Cond_CS@out", "C.M.RO2_Cond_CS@out", "C.M.RO3_Cond_CS@out", "C.M.RO4_Cond_CS@out", # 6. 新增需画图的水质传感器(4个) "C.M.RO_PH_ZJS@out", # 进水pH:不在6-9为异常 "C.M.UF_Tur_ZJS@out", # 进水浊度:大于10为异常 "C.M.UF_Tur_ZCS@out", # 产水浊度:大于0.15为异常 "C.M.PH_WGS@out" # 外供水pH:不在6-10为异常(已修改阈值) ] # -------------------------- 传感器异常检测类 -------------------------- class DirectSensorAnomalyDetector: """直接传感器异常检测器类:包含压差、电导、水质传感器画图,标准按要求配置""" def __init__(self, data_dir="datasets_xishan", result_dir="direct_detection_results"): self.data_dir = data_dir # 数据文件存放目录 self.result_dir = result_dir # 结果保存目录 self.fig_dir = os.path.join(result_dir, "figures") # 图表保存目录 os.makedirs(self.result_dir, exist_ok=True) os.makedirs(self.fig_dir, exist_ok=True) # 自动创建图表目录 # 核心配置:仅保留有明确阈值的传感器(无阈值的已删除) self.sensor_config = self._build_sensor_config() # 存储结果 self.results = { "raw_data": {}, # 原始数据(仅用于统计) "valid_data": {}, # 筛选后的有效数据(核心分析数据) "anomalies": {}, # 异常标记(True=异常) "threshold_summary": [] # 阈值总结列表 } def _build_sensor_config(self): """构建传感器配置:包含所有明确阈值,重点确保新增水质传感器标准正确""" config = {} # 1. 超滤(UF)相关传感器 - 跨膜压差标准:超过0.06Mpa为异常 uf_vars = [ "C.M.UF1_DB@press_PV", # 跨膜压差:超过0.06Mpa为异常 "C.M.UF2_DB@press_PV", "C.M.UF3_DB@press_PV", "C.M.UF4_DB@press_PV", "C.M.UF1_FT_JS@out", # 进水流量:低于基准80%或110%-120% "C.M.UF2_FT_JS@out", "C.M.UF3_FT_JS@out", "C.M.UF4_FT_JS@out", "UF1_FluxF", # 膜通量:49.5-60.5lmh "UF2_FluxF", "UF3_FluxF", "UF4_FluxF", "UF1Per", # 渗透率:247.5-302.5lmh/bar "UF2Per", "UF3Per", "UF4Per", "C.M.UF1_DW@press_PV", # 反洗压差:偏离正常±10%-20% "C.M.UF2_DW@press_PV", "C.M.UF3_DW@press_PV", "C.M.UF4_DW@press_PV", "C.M.UF1_PT_CS@out", # 产水压力:偏离正常±10%-20% "C.M.UF2_PT_CS@out", "C.M.UF3_PT_CS@out", "C.M.UF4_PT_CS@out", "C.M.UF_FT_ZCS@out", # 总产水流量:偏离正常±10%-20% "C.M.UF_PT_ZCS@out", # 总产水压力:偏离正常±10%-20% "C.M.UF_PT_ZJS@out", # 总进水压力:偏离正常±10%-20% "C.M.UF_FT_FX@out" # 反洗流量:偏离正常±10%-20% ] for var in uf_vars: if "DB@press_PV" in var and "DW" not in var: # 跨膜压差(非反洗) desc = "跨膜压差:超过0.06Mpa为异常" elif "FluxF" in var: desc = "膜运行通量:49.5-60.5lmh" elif "FT_JS" in var: desc = "进水流量:低于基准80%或110%-120%" elif "Per" in var: desc = "渗透率:247.5-302.5lmh/bar" else: desc = "正常范围±10%-20%" config[var] = {"source": "UFExcel", "desc": desc} # 2. 反渗透(RO)相关传感器 - 段间压差、电导标准按要求配置 ro_vars = [ "C.M.RO1_DB@DPT_1", # 一段压差:超过0.3Mpa为异常(已修改阈值) "C.M.RO2_DB@DPT_1", "C.M.RO3_DB@DPT_1", "C.M.RO4_DB@DPT_1", "C.M.RO1_DB@DPT_2", # 二段压差:超过0.3Mpa为异常(已修改阈值) "C.M.RO2_DB@DPT_2", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_2", "C.M.RO1_PT_JS@out", # 一段进水压力:偏离正常±10%-20% "C.M.RO2_PT_JS@out", "C.M.RO3_PT_JS@out", "C.M.RO4_PT_JS@out", "C.M.RO1_FT_NS@out", # 浓水流量:偏离正常±10%-20% "C.M.RO2_FT_NS@out", "C.M.RO3_FT_NS@out", "C.M.RO4_FT_NS@out", "C.M.RO_Cond_ZJS@out", # 总进水电导:超过4000μs/cm为异常 "C.M.RO1_Cond_CS@out", # 产水电导:超过250μs/cm为异常 "C.M.RO2_Cond_CS@out", "C.M.RO3_Cond_CS@out", "C.M.RO4_Cond_CS@out", "RO1_FluxF", # 膜通量:19.8-24.2lmh "RO2_FluxF", "RO3_FluxF", "RO4_FluxF", "RO1HSL", # 回收率:70%-80% "RO2HSL", "RO3HSL", "RO4HSL", "RO1_TYL", # 脱盐率:≥97% "RO2_TYL", "RO3_TYL", "RO4_TYL", "C.M.RO_PT_ZCS@out", # 总产水压力:偏离正常±10%-20% "C.M.RO1_PT_CS@out", # 产水压力:偏离正常±10%-20% "C.M.RO2_PT_CS@out", "C.M.RO3_PT_CS@out", "C.M.RO4_PT_CS@out" ] for var in ro_vars: if "DB@DPT_1" in var or "DB@DPT_2" in var: # 一段/二段压差(已修改阈值) desc = "段间压差:超过0.3Mpa为异常" # 原阈值0.12Mpa修改为0.3Mpa elif "Cond_ZJS" in var: # 总进水电导 desc = "总进水电导:超过4000μs/cm为异常" elif "Cond_CS" in var: # 产水电导 desc = "产水电导:超过250μs/cm为异常" elif "HSL" in var: desc = "回收率:70%-80%" elif "TYL" in var: desc = "脱盐率:≥97%" elif "FluxF" in var: desc = "膜运行通量:19.8-24.2lmh" else: desc = "正常范围±10%-20%" config[var] = {"source": "ROExcel", "desc": desc} # 3. 水质类传感器 - 重点确保新增4个传感器标准正确 water_quality_vars = { "C.M.RO_TT_ZJS@out": "水温:5-35℃", "C.M.RO_PH_ZJS@out": "进水PH:不在6.0-9.0为异常", # 新增画图 "C.M.RO_ORP_ZJS@out": "进水ORP:±300mv", "C.M.UF_Tur_ZJS@out": "进水浊度:大于10NTU为异常", # 新增画图 "C.M.UF_Tur_ZCS@out": "产水浊度:大于0.15NTU为异常", # 新增画图 "C.M.ZH_PH@out": "中和池PH:6.0-9.0", "C.M.ZH_ORP@out": "中和池ORP:±300mv", "C.M.PH_CIP@out": "CIP清洗液PH:酸1.5-3.0/碱10-12.5", "C.M.PH_WGS@out": "外供水PH:不在6.0-10.0为异常" # 原阈值6.0-9.0修改为6.0-10.0 } for var, desc in water_quality_vars.items(): config[var] = {"source": "Excel水质规则", "desc": desc} # 4. 液位类传感器 - 标准不变 level_vars = { "C.M.LT_JSC@out": "超滤原水池液位:3.0-5.0m", "C.M.LT_FXSC@out": "反洗水池液位:3.0-5.0m", "C.M.LT_QSC@out": "清水池液位:3.0-5.0m", "C.M.LT_ZHC@out": "中和池液位:3.0-5.0m", "C.M.LT_HCl@out": "盐酸药箱液位:0.2-1.1m", "C.M.LT_NaOH@out": "氢氧化钠药箱液位:0.2-1.1m", "C.M.LT_NaClO@out": "次氯酸钠药箱液位:0.2-1.1m", "C.M.LT_PAC@out": "絮凝剂药箱液位:0.2-1.1m", "C.M.LT_HYJ1@out": "还原剂药箱液位:0.2-1.1m", "C.M.LT_HYJ2@out": "还原剂药箱液位:0.2-1.1m", "C.M.LT_ZGJ@out": "阻垢剂药箱液位:0.2-1.1m", "C.M.LT_SJJ@out": "杀菌剂药箱液位:0.2-1.1m" } for var, desc in level_vars.items(): config[var] = {"source": "Excel液位规则", "desc": desc} # 5. 泵/风机频率传感器 - 标准不变 frequency_vars = [ "C.M.UF_GSB1_fre@out", # 运行频率:35-65Hz "C.M.UF_GSB2_fre@out", "C.M.UF_GSB3_fre@out", "C.M.UF_GSB4_fre@out", "C.M.UF_FXB1_fre@out", "C.M.UF_FXB2_fre@out", "C.M.UF_FXB3_fre@out", "C.M.RO_GYB1_fre@out", "C.M.RO_GYB2_fre@out", "C.M.RO_GYB3_fre@out", "C.M.RO_GYB4_fre@out", "C.M.RO_DJB1_fre@out", "C.M.RO_DJB2_fre@out", "C.M.RO_DJB3_fre@out", "C.M.RO_DJB4_fre@out", "C.M.RO_WGB1_fre@out", "C.M.RO_WGB2_fre@out", "C.M.RO_WGB3_fre@out", "C.M.CIP_QXB1_fre@out", "C.M.CIP_QXB2_fre@out", "C.M.JYB2_ZGJ1_fre@out", "C.M.JYB2_ZGJ2_fre@out", "C.M.JYB2_ZGJ3_fre@out", "C.M.JYB2_ZGJ4_fre@out" ] for var in frequency_vars: config[var] = { "source": "设备默认规则", "desc": "运行频率:35-65Hz" } # 6. 补充异常规则变量 - 标准不变 supplemental_vars = { "water_in": "超滤总进水量:≥600", "C.M.FT_ZJS@out": "总进水流量:≥600", "RO_TCHFlow": "RO总产水流量:≥550", "RO1_CSFlow": "RO1产水流量:≥150", "RO2_CSFlow": "RO2产水流量:≥150", "RO3_CSFlow": "RO3产水流量:≥150", "RO4_CSFlow": "RO4产水流量:≥150", "C.M.UF_ORP_ZCS@out": "超滤总产水ORP:≤100", "C.M.UF_PH_ZCS@out": "超滤总产水PH:6.0-9.0", "C.M.PT_KYJ@out": "压缩空气压力:≥0.5", "C.M.UF_Cl_ZCS@out": "超滤总产水余氯:≤0", "RO_TotalFlow": "RO总进水流量:≥700", "QSWGB_Flow": "清水外供泵出水流量:≥200" } for var, desc in supplemental_vars.items(): config[var] = {"source": "补充规则", "desc": desc} return config def filter_valid_data(self, raw_data): """筛选有效数据:剔除NaN、负值、无穷大(无效数据)""" valid_data = np.where( (raw_data > 0) & (~np.isnan(raw_data)) & (~np.isinf(raw_data)), raw_data, np.nan # 无效数据标记为NaN(画图时自动不显示) ) return valid_data def _parse_threshold(self, desc): """解析阈值描述:支持超过阈值、范围、小于阈值等所有场景""" # 处理"超过X为异常"格式(跨膜压差、段间压差、电导) over_match = re.search(r'超过(\d+\.?\d*)', desc) if over_match: return ("over", float(over_match.group(1))) # 处理"不在X-Y为异常"格式(pH值) not_range_match = re.search(r'不在(\d+\.?\d*)-(\d+\.?\d*)', desc) if not_range_match: return ("not_range", float(not_range_match.group(1)), float(not_range_match.group(2))) # 处理"大于X为异常"格式(浊度) gt_match = re.search(r'大于(\d+\.?\d*)', desc) if gt_match: return ("gt", float(gt_match.group(1))) # 处理范围格式(A-B) range_match = re.search(r'(\d+\.?\d*)-(\d+\.?\d*)', desc) if range_match: return (float(range_match.group(1)), float(range_match.group(2))) # 处理" threshold_val # 2. 不在X-Y为异常(pH值) elif isinstance(thresholds, tuple) and thresholds[0] == "not_range": low, high = thresholds[1], thresholds[2] anomalies[valid_mask] = (valid_data[valid_mask] < low) | (valid_data[valid_mask] > high) # 3. 大于X为异常(浊度) elif isinstance(thresholds, tuple) and thresholds[0] == "gt": threshold_val = thresholds[1] anomalies[valid_mask] = valid_data[valid_mask] > threshold_val # 4. 双范围处理(CIP清洗液PH) elif isinstance(thresholds, tuple) and len(thresholds) == 2 and all(isinstance(t, tuple) for t in thresholds): (a, b), (c, d) = thresholds normal = ((valid_data >= a) & (valid_data <= b)) | ((valid_data >= c) & (valid_data <= d)) anomalies[valid_mask] = ~normal[valid_mask] # 5. 基准百分比范围(低于80%或110%-120%) elif thresholds == ("baseline_80_110_120",): baseline = np.percentile(valid_data[valid_mask], 50) low_threshold = baseline * 0.8 high_low = baseline * 1.1 high_high = baseline * 1.2 anomalies[valid_mask] = (valid_data[valid_mask] < low_threshold) | \ ((valid_data[valid_mask] >= high_low) & (valid_data[valid_mask] <= high_high)) # 6. 百分比范围(±10%-20%) elif isinstance(thresholds, tuple) and thresholds[0] == "percent_range": percent = 0.1 if "±10%" in config["desc"] else 0.2 mean_val = np.mean(valid_data[valid_mask]) low = mean_val * (1 - percent) high = mean_val * (1 + percent) anomalies[valid_mask] = (valid_data[valid_mask] < low) | (valid_data[valid_mask] > high) # 7. 普通范围处理 elif thresholds[0] is not None and thresholds[1] is not None: lower, upper = thresholds anomalies[valid_mask] = (valid_data[valid_mask] < lower) | (valid_data[valid_mask] > upper) return anomalies def load_raw_data(self): """加载所有CSV数据,仅保留有明确阈值的传感器""" file_paths = [os.path.join(self.data_dir, f"data_process_{i}.csv") for i in range(1, 105)] existing_files = [p for p in file_paths if os.path.exists(p)] if not existing_files: raise FileNotFoundError(f" 未在 {self.data_dir} 目录找到数据文件(需命名为data_process_1~104.csv)") print(f"找到 {len(existing_files)} 个数据文件,加载中...") for file in existing_files: try: df = pd.read_csv(file).iloc[:, 1:] # 剔除第一列时间列 for var_name in self.sensor_config.keys(): if var_name not in df.columns: continue # 合并原始数据 raw_data = df[var_name].values if var_name not in self.results["raw_data"]: self.results["raw_data"][var_name] = [] self.results["raw_data"][var_name].append(raw_data) except Exception as e: print(f" 加载文件 {os.path.basename(file)} 出错:{str(e)},跳过") # 转换为numpy数组 for var_name in self.results["raw_data"]: self.results["raw_data"][var_name] = np.concatenate(self.results["raw_data"][var_name]) def plot_anomalies(self, var_name): """画图:有效数据+异常数据+阈值线(适配所有新增传感器类型)""" valid_data = self.results["valid_data"][var_name] anomalies = self.results["anomalies"][var_name] config = self.sensor_config[var_name] # 生成时间轴(按数据点顺序,单位:小时) time = np.arange(len(valid_data)) / 60 # 假设1分钟1个数据点,转换为小时 # 筛选有效数据和异常数据的索引 valid_mask = ~np.isnan(valid_data) anomaly_mask = anomalies & valid_mask # 创建图表 plt.figure(figsize=(12, 6)) # 1. 绘制有效数据(蓝色实线) plt.plot(time[valid_mask], valid_data[valid_mask], 'b-', alpha=0.8, label='有效数据') # 2. 绘制异常数据(红色圆点,突出显示)- 不显示异常点个数 if np.any(anomaly_mask): plt.scatter(time[anomaly_mask], valid_data[anomaly_mask], color='red', s=20, zorder=5, label='异常点') # 修改此处,去掉个数统计 # 3. 绘制阈值线(根据阈值类型适配) thresholds = self._parse_threshold(config["desc"]) if isinstance(thresholds, tuple): # 超过X为异常(红色虚线) if thresholds[0] == "over": threshold_val = thresholds[1] plt.axhline(y=threshold_val, color='red', linestyle='--', alpha=0.8, label=f'异常阈值:{threshold_val}(超过为异常)') # 不在X-Y为异常(绿色虚线:上下限) elif thresholds[0] == "not_range": low, high = thresholds[1], thresholds[2] plt.axhline(y=low, color='green', linestyle='--', alpha=0.8, label=f'正常下限:{low}') plt.axhline(y=high, color='green', linestyle='--', alpha=0.8, label=f'正常上限:{high}') plt.fill_between(time, low, high, alpha=0.1, color='green', label='正常范围') # 大于X为异常(红色虚线) elif thresholds[0] == "gt": threshold_val = thresholds[1] plt.axhline(y=threshold_val, color='red', linestyle='--', alpha=0.8, label=f'异常阈值:{threshold_val}(大于为异常)') # 双范围(CIP清洗液PH) elif len(thresholds) == 2 and all(isinstance(t, tuple) for t in thresholds): (a, b), (c, d) = thresholds plt.axhline(y=a, color='g', linestyle='--', alpha=0.6, label=f'酸下限:{a}') plt.axhline(y=b, color='g', linestyle='--', alpha=0.6, label=f'酸上限:{b}') plt.axhline(y=c, color='g', linestyle='--', alpha=0.6, label=f'碱下限:{c}') plt.axhline(y=d, color='g', linestyle='--', alpha=0.6, label=f'碱上限:{d}') # 普通范围 elif thresholds[0] is not None and thresholds[1] is not None: lower, upper = thresholds if lower != float('-inf'): plt.axhline(y=lower, color='g', linestyle='--', alpha=0.6, label=f'正常下限:{lower}') if upper != float('inf'): plt.axhline(y=upper, color='g', linestyle='--', alpha=0.6, label=f'正常上限:{upper}') # 百分比范围 elif thresholds[0] == "percent_range": percent = 0.1 if "±10%" in config["desc"] else 0.2 mean_val = np.mean(valid_data[valid_mask]) low = mean_val * (1 - percent) high = mean_val * (1 + percent) plt.axhline(y=low, color='g', linestyle='--', alpha=0.6, label=f'正常下限:{low:.2f}') plt.axhline(y=high, color='g', linestyle='--', alpha=0.6, label=f'正常上限:{high:.2f}') # 图表美化 plt.title(f'{var_name}\n异常标准:{config["desc"]}', fontsize=12, pad=20) plt.xlabel('时间(小时)', fontsize=10) plt.ylabel('数值', fontsize=10) plt.legend(loc='upper right', fontsize=9) plt.grid(alpha=0.3) plt.tight_layout() # 保存图表 fig_path = os.path.join(self.fig_dir, f'{var_name}_异常检测图.png') plt.savefig(fig_path, dpi=300, bbox_inches='tight', facecolor='white') plt.close() print(f" 图表保存:{fig_path}") def run_detection(self): """完整检测流程:加载数据→筛选有效数据→异常检测→指定传感器画图→生成报告""" # 1. 加载原始数据 self.load_raw_data() # 2. 逐传感器处理 processed_count = 0 abnormal_count = 0 plotted_count = 0 # 统计画图的传感器数量 for var_name, config in self.sensor_config.items(): if var_name not in self.results["raw_data"]: print(f"变量 {var_name} 无数据,跳过") continue print(f"\n 处理传感器:{var_name}") raw_data = self.results["raw_data"][var_name] # 3. 筛选有效数据 valid_data = self.filter_valid_data(raw_data) self.results["valid_data"][var_name] = valid_data valid_count = np.sum(~np.isnan(valid_data)) print(f" 有效数据量:{valid_count}/{len(raw_data)}") if valid_count == 0: continue # 4. 异常检测 anomalies = self.rule_based_detect(var_name, valid_data) self.results["anomalies"][var_name] = anomalies processed_count += 1 # 统计异常 anomaly_count = np.sum(anomalies) anomaly_ratio = (anomaly_count / valid_count) * 100 if valid_count > 0 else 0.0 if anomaly_count > 0: abnormal_count += 1 print(f" 异常数据:{anomaly_count} 条(占比 {anomaly_ratio:.2f}%)") else: print(f" 无异常数据") # 5. 仅对白名单中的传感器画图(包含新增的4个水质传感器) if var_name in PLOT_WHITELIST: self.plot_anomalies(var_name) plotted_count += 1 # 记录总结 self.results["threshold_summary"].append({ "传感器名称": var_name.split("@")[0] if "@" in var_name else var_name, "变量标识": var_name, "阈值来源": config["source"], "异常标准": config["desc"], "原始数据总量": len(raw_data), "有效数据量": valid_count, "异常数据量": anomaly_count, "异常比例(%)": round(anomaly_ratio, 2), "是否画图": "是" if var_name in PLOT_WHITELIST else "否" }) # 6. 生成总结报告 self._generate_summary_report(processed_count, abnormal_count, plotted_count) return self.results def _generate_summary_report(self, processed_count, abnormal_count, plotted_count): """生成检测总结报告""" summary_df = pd.DataFrame(self.results["threshold_summary"]) summary_path = os.path.join(self.result_dir, "传感器异常检测总结报告.csv") summary_df.to_csv(summary_path, index=False, encoding="utf-8-sig") # 打印汇总信息 total_sensors = len(self.sensor_config) print(f"\n" + "="*60) print(f" 检测总结") print(f"="*60) print(f"总传感器数量(有明确阈值):{total_sensors}") print(f"成功处理的传感器:{processed_count}") print(f"存在异常的传感器:{abnormal_count}") print(f"生成图表的传感器:{plotted_count}(压差类+电导类+水质类)") print(f" - 跨膜压差:4个") print(f" - 一段/二段压差:8个") print(f" - 电导类:5个") print(f" - 水质类(pH+浊度):4个") print(f"结果目录:{os.path.abspath(self.result_dir)}") print(f"图表目录:{os.path.abspath(self.fig_dir)}") print(f"总结报告:{os.path.abspath(summary_path)}") print(f"="*60) if __name__ == "__main__": try: # 初始化检测器并执行 detector = DirectSensorAnomalyDetector( data_dir="datasets_xishan", # 数据目录(需自行确保存在) result_dir="direct_detection_results" # 结果保存目录 ) detector.run_detection() print("\n 检测完成!已生成压差、电导、水质传感器的异常检测图表") except Exception as e: print(f"\n 检测出错:{str(e)}")