| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613 |
- import os
- import re
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- from matplotlib.font_manager import FontProperties
- # -------------------------- 彻底解决字体警告问题 --------------------------
- def setup_chinese_font():
- try:
- # 尝试加载Windows常见中文字体
- font = FontProperties(fname="C:/Windows/Fonts/simhei.ttf") # 黑体
- plt.rcParams["font.family"] = font.get_name()
- except:
- try:
- # 尝试加载macOS常见中文字体
- font = FontProperties(fname="/System/Library/Fonts/PingFang.ttc") # 苹方
- plt.rcParams["font.family"] = font.get_name()
- except:
- try:
- # 尝试加载Linux常见中文字体
- font = FontProperties(fname="/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf")
- plt.rcParams["font.family"] = font.get_name()
- except:
- # 无中文字体时使用默认英文无衬线字体(无警告)
- plt.rcParams["font.family"] = ["DejaVu Sans", "sans-serif"]
- plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
- # 初始化字体配置(无警告)
- setup_chinese_font()
- # -------------------------- 核心配置:需画图的传感器白名单(新增4个水质传感器) --------------------------
- PLOT_WHITELIST = [
- # 1. 跨膜压差(4个)
- "C.M.UF1_DB@press_PV",
- "C.M.UF2_DB@press_PV",
- "C.M.UF3_DB@press_PV",
- "C.M.UF4_DB@press_PV",
- # 2. 一段压差(4个)
- "C.M.RO1_DB@DPT_1",
- "C.M.RO2_DB@DPT_1",
- "C.M.RO3_DB@DPT_1",
- "C.M.RO4_DB@DPT_1",
- # 3. 二段压差(4个)
- "C.M.RO1_DB@DPT_2",
- "C.M.RO2_DB@DPT_2",
- "C.M.RO3_DB@DPT_2",
- "C.M.RO4_DB@DPT_2",
- # 4. 总进水电导(1个)
- "C.M.RO_Cond_ZJS@out",
- # 5. 产水电导(4个)
- "C.M.RO1_Cond_CS@out",
- "C.M.RO2_Cond_CS@out",
- "C.M.RO3_Cond_CS@out",
- "C.M.RO4_Cond_CS@out",
- # 6. 新增需画图的水质传感器(4个)
- "C.M.RO_PH_ZJS@out", # 进水pH:不在6-9为异常
- "C.M.UF_Tur_ZJS@out", # 进水浊度:大于10为异常
- "C.M.UF_Tur_ZCS@out", # 产水浊度:大于0.15为异常
- "C.M.PH_WGS@out" # 外供水pH:不在6-10为异常(已修改阈值)
- ]
- # -------------------------- 传感器异常检测类 --------------------------
- class DirectSensorAnomalyDetector:
- """直接传感器异常检测器类:包含压差、电导、水质传感器画图,标准按要求配置"""
- def __init__(self, data_dir="datasets_xishan", result_dir="direct_detection_results"):
- self.data_dir = data_dir # 数据文件存放目录
- self.result_dir = result_dir # 结果保存目录
- self.fig_dir = os.path.join(result_dir, "figures") # 图表保存目录
- os.makedirs(self.result_dir, exist_ok=True)
- os.makedirs(self.fig_dir, exist_ok=True) # 自动创建图表目录
-
- # 核心配置:仅保留有明确阈值的传感器(无阈值的已删除)
- self.sensor_config = self._build_sensor_config()
-
- # 存储结果
- self.results = {
- "raw_data": {}, # 原始数据(仅用于统计)
- "valid_data": {}, # 筛选后的有效数据(核心分析数据)
- "anomalies": {}, # 异常标记(True=异常)
- "threshold_summary": [] # 阈值总结列表
- }
- def _build_sensor_config(self):
- """构建传感器配置:包含所有明确阈值,重点确保新增水质传感器标准正确"""
- config = {}
-
- # 1. 超滤(UF)相关传感器 - 跨膜压差标准:超过0.06Mpa为异常
- uf_vars = [
- "C.M.UF1_DB@press_PV", # 跨膜压差:超过0.06Mpa为异常
- "C.M.UF2_DB@press_PV",
- "C.M.UF3_DB@press_PV",
- "C.M.UF4_DB@press_PV",
- "C.M.UF1_FT_JS@out", # 进水流量:低于基准80%或110%-120%
- "C.M.UF2_FT_JS@out",
- "C.M.UF3_FT_JS@out",
- "C.M.UF4_FT_JS@out",
- "UF1_FluxF", # 膜通量:49.5-60.5lmh
- "UF2_FluxF",
- "UF3_FluxF",
- "UF4_FluxF",
- "UF1Per", # 渗透率:247.5-302.5lmh/bar
- "UF2Per",
- "UF3Per",
- "UF4Per",
- "C.M.UF1_DW@press_PV", # 反洗压差:偏离正常±10%-20%
- "C.M.UF2_DW@press_PV",
- "C.M.UF3_DW@press_PV",
- "C.M.UF4_DW@press_PV",
- "C.M.UF1_PT_CS@out", # 产水压力:偏离正常±10%-20%
- "C.M.UF2_PT_CS@out",
- "C.M.UF3_PT_CS@out",
- "C.M.UF4_PT_CS@out",
- "C.M.UF_FT_ZCS@out", # 总产水流量:偏离正常±10%-20%
- "C.M.UF_PT_ZCS@out", # 总产水压力:偏离正常±10%-20%
- "C.M.UF_PT_ZJS@out", # 总进水压力:偏离正常±10%-20%
- "C.M.UF_FT_FX@out" # 反洗流量:偏离正常±10%-20%
- ]
- for var in uf_vars:
- if "DB@press_PV" in var and "DW" not in var: # 跨膜压差(非反洗)
- desc = "跨膜压差:超过0.06Mpa为异常"
- elif "FluxF" in var:
- desc = "膜运行通量:49.5-60.5lmh"
- elif "FT_JS" in var:
- desc = "进水流量:低于基准80%或110%-120%"
- elif "Per" in var:
- desc = "渗透率:247.5-302.5lmh/bar"
- else:
- desc = "正常范围±10%-20%"
- config[var] = {"source": "UFExcel", "desc": desc}
-
- # 2. 反渗透(RO)相关传感器 - 段间压差、电导标准按要求配置
- ro_vars = [
- "C.M.RO1_DB@DPT_1", # 一段压差:超过0.3Mpa为异常(已修改阈值)
- "C.M.RO2_DB@DPT_1",
- "C.M.RO3_DB@DPT_1",
- "C.M.RO4_DB@DPT_1",
- "C.M.RO1_DB@DPT_2", # 二段压差:超过0.3Mpa为异常(已修改阈值)
- "C.M.RO2_DB@DPT_2",
- "C.M.RO3_DB@DPT_2",
- "C.M.RO4_DB@DPT_2",
- "C.M.RO1_PT_JS@out", # 一段进水压力:偏离正常±10%-20%
- "C.M.RO2_PT_JS@out",
- "C.M.RO3_PT_JS@out",
- "C.M.RO4_PT_JS@out",
- "C.M.RO1_FT_NS@out", # 浓水流量:偏离正常±10%-20%
- "C.M.RO2_FT_NS@out",
- "C.M.RO3_FT_NS@out",
- "C.M.RO4_FT_NS@out",
- "C.M.RO_Cond_ZJS@out", # 总进水电导:超过4000μs/cm为异常
- "C.M.RO1_Cond_CS@out", # 产水电导:超过250μs/cm为异常
- "C.M.RO2_Cond_CS@out",
- "C.M.RO3_Cond_CS@out",
- "C.M.RO4_Cond_CS@out",
- "RO1_FluxF", # 膜通量:19.8-24.2lmh
- "RO2_FluxF",
- "RO3_FluxF",
- "RO4_FluxF",
- "RO1HSL", # 回收率:70%-80%
- "RO2HSL",
- "RO3HSL",
- "RO4HSL",
- "RO1_TYL", # 脱盐率:≥97%
- "RO2_TYL",
- "RO3_TYL",
- "RO4_TYL",
- "C.M.RO_PT_ZCS@out", # 总产水压力:偏离正常±10%-20%
- "C.M.RO1_PT_CS@out", # 产水压力:偏离正常±10%-20%
- "C.M.RO2_PT_CS@out",
- "C.M.RO3_PT_CS@out",
- "C.M.RO4_PT_CS@out"
- ]
- for var in ro_vars:
- if "DB@DPT_1" in var or "DB@DPT_2" in var: # 一段/二段压差(已修改阈值)
- desc = "段间压差:超过0.3Mpa为异常" # 原阈值0.12Mpa修改为0.3Mpa
- elif "Cond_ZJS" in var: # 总进水电导
- desc = "总进水电导:超过4000μs/cm为异常"
- elif "Cond_CS" in var: # 产水电导
- desc = "产水电导:超过250μs/cm为异常"
- elif "HSL" in var:
- desc = "回收率:70%-80%"
- elif "TYL" in var:
- desc = "脱盐率:≥97%"
- elif "FluxF" in var:
- desc = "膜运行通量:19.8-24.2lmh"
- else:
- desc = "正常范围±10%-20%"
- config[var] = {"source": "ROExcel", "desc": desc}
-
- # 3. 水质类传感器 - 重点确保新增4个传感器标准正确
- water_quality_vars = {
- "C.M.RO_TT_ZJS@out": "水温:5-35℃",
- "C.M.RO_PH_ZJS@out": "进水PH:不在6.0-9.0为异常", # 新增画图
- "C.M.RO_ORP_ZJS@out": "进水ORP:±300mv",
- "C.M.UF_Tur_ZJS@out": "进水浊度:大于10NTU为异常", # 新增画图
- "C.M.UF_Tur_ZCS@out": "产水浊度:大于0.15NTU为异常", # 新增画图
- "C.M.ZH_PH@out": "中和池PH:6.0-9.0",
- "C.M.ZH_ORP@out": "中和池ORP:±300mv",
- "C.M.PH_CIP@out": "CIP清洗液PH:酸1.5-3.0/碱10-12.5",
- "C.M.PH_WGS@out": "外供水PH:不在6.0-10.0为异常" # 原阈值6.0-9.0修改为6.0-10.0
- }
- for var, desc in water_quality_vars.items():
- config[var] = {"source": "Excel水质规则", "desc": desc}
-
- # 4. 液位类传感器 - 标准不变
- level_vars = {
- "C.M.LT_JSC@out": "超滤原水池液位:3.0-5.0m",
- "C.M.LT_FXSC@out": "反洗水池液位:3.0-5.0m",
- "C.M.LT_QSC@out": "清水池液位:3.0-5.0m",
- "C.M.LT_ZHC@out": "中和池液位:3.0-5.0m",
- "C.M.LT_HCl@out": "盐酸药箱液位:0.2-1.1m",
- "C.M.LT_NaOH@out": "氢氧化钠药箱液位:0.2-1.1m",
- "C.M.LT_NaClO@out": "次氯酸钠药箱液位:0.2-1.1m",
- "C.M.LT_PAC@out": "絮凝剂药箱液位:0.2-1.1m",
- "C.M.LT_HYJ1@out": "还原剂药箱液位:0.2-1.1m",
- "C.M.LT_HYJ2@out": "还原剂药箱液位:0.2-1.1m",
- "C.M.LT_ZGJ@out": "阻垢剂药箱液位:0.2-1.1m",
- "C.M.LT_SJJ@out": "杀菌剂药箱液位:0.2-1.1m"
- }
- for var, desc in level_vars.items():
- config[var] = {"source": "Excel液位规则", "desc": desc}
-
- # 5. 泵/风机频率传感器 - 标准不变
- frequency_vars = [
- "C.M.UF_GSB1_fre@out", # 运行频率:35-65Hz
- "C.M.UF_GSB2_fre@out",
- "C.M.UF_GSB3_fre@out",
- "C.M.UF_GSB4_fre@out",
- "C.M.UF_FXB1_fre@out",
- "C.M.UF_FXB2_fre@out",
- "C.M.UF_FXB3_fre@out",
- "C.M.RO_GYB1_fre@out",
- "C.M.RO_GYB2_fre@out",
- "C.M.RO_GYB3_fre@out",
- "C.M.RO_GYB4_fre@out",
- "C.M.RO_DJB1_fre@out",
- "C.M.RO_DJB2_fre@out",
- "C.M.RO_DJB3_fre@out",
- "C.M.RO_DJB4_fre@out",
- "C.M.RO_WGB1_fre@out",
- "C.M.RO_WGB2_fre@out",
- "C.M.RO_WGB3_fre@out",
- "C.M.CIP_QXB1_fre@out",
- "C.M.CIP_QXB2_fre@out",
- "C.M.JYB2_ZGJ1_fre@out",
- "C.M.JYB2_ZGJ2_fre@out",
- "C.M.JYB2_ZGJ3_fre@out",
- "C.M.JYB2_ZGJ4_fre@out"
- ]
- for var in frequency_vars:
- config[var] = {
- "source": "设备默认规则",
- "desc": "运行频率:35-65Hz"
- }
-
- # 6. 补充异常规则变量 - 标准不变
- supplemental_vars = {
- "water_in": "超滤总进水量:≥600",
- "C.M.FT_ZJS@out": "总进水流量:≥600",
- "RO_TCHFlow": "RO总产水流量:≥550",
- "RO1_CSFlow": "RO1产水流量:≥150",
- "RO2_CSFlow": "RO2产水流量:≥150",
- "RO3_CSFlow": "RO3产水流量:≥150",
- "RO4_CSFlow": "RO4产水流量:≥150",
- "C.M.UF_ORP_ZCS@out": "超滤总产水ORP:≤100",
- "C.M.UF_PH_ZCS@out": "超滤总产水PH:6.0-9.0",
- "C.M.PT_KYJ@out": "压缩空气压力:≥0.5",
- "C.M.UF_Cl_ZCS@out": "超滤总产水余氯:≤0",
- "RO_TotalFlow": "RO总进水流量:≥700",
- "QSWGB_Flow": "清水外供泵出水流量:≥200"
- }
- for var, desc in supplemental_vars.items():
- config[var] = {"source": "补充规则", "desc": desc}
-
- return config
- def filter_valid_data(self, raw_data):
- """筛选有效数据:剔除NaN、负值、无穷大(无效数据)"""
- valid_data = np.where(
- (raw_data > 0) & (~np.isnan(raw_data)) & (~np.isinf(raw_data)),
- raw_data,
- np.nan # 无效数据标记为NaN(画图时自动不显示)
- )
- return valid_data
- def _parse_threshold(self, desc):
- """解析阈值描述:支持超过阈值、范围、小于阈值等所有场景"""
- # 处理"超过X为异常"格式(跨膜压差、段间压差、电导)
- over_match = re.search(r'超过(\d+\.?\d*)', desc)
- if over_match:
- return ("over", float(over_match.group(1)))
-
- # 处理"不在X-Y为异常"格式(pH值)
- not_range_match = re.search(r'不在(\d+\.?\d*)-(\d+\.?\d*)', desc)
- if not_range_match:
- return ("not_range", float(not_range_match.group(1)), float(not_range_match.group(2)))
-
- # 处理"大于X为异常"格式(浊度)
- gt_match = re.search(r'大于(\d+\.?\d*)', desc)
- if gt_match:
- return ("gt", float(gt_match.group(1)))
-
- # 处理范围格式(A-B)
- range_match = re.search(r'(\d+\.?\d*)-(\d+\.?\d*)', desc)
- if range_match:
- return (float(range_match.group(1)), float(range_match.group(2)))
-
- # 处理"<X"格式
- lt_match = re.search(r'<(\d+\.?\d*)', desc)
- if lt_match:
- return (0.0, float(lt_match.group(1)) - 1e-9)
-
- # 处理"≥X"格式
- ge_match = re.search(r'≥(\d+\.?\d*)', desc)
- if ge_match:
- return (float(ge_match.group(1)), float('inf'))
-
- # 处理"≤X"格式
- le_match = re.search(r'≤(\d+\.?\d*)', desc)
- if le_match:
- return (float('-inf'), float(le_match.group(1)))
-
- # 处理"±X"格式
- pm_match = re.search(r'±(\d+\.?\d*)', desc)
- if pm_match:
- val = float(pm_match.group(1))
- return (-val, val)
-
- # 处理双范围(酸A-B/碱C-D)
- double_range_match = re.search(r'酸(\d+\.?\d*)-(\d+\.?\d*)/碱(\d+\.?\d*)-(\d+\.?\d*)', desc)
- if double_range_match:
- a, b, c, d = map(float, double_range_match.groups())
- return ((a, b), (c, d))
-
- # 处理百分比范围(低于基准80%或110%-120%)
- if "低于基准80%" in desc and "110%-120%" in desc:
- return ("baseline_80_110_120",)
-
- # 处理±10%-20%
- if "±10%" in desc or "±20%" in desc:
- return ("percent_range", desc)
-
- return (None, None)
- def rule_based_detect(self, var_name, valid_data):
- """基于规则的异常检测:支持新增水质传感器的异常逻辑"""
- config = self.sensor_config[var_name]
- desc = config["desc"]
- thresholds = self._parse_threshold(desc)
-
- # 跳过无效数据(NaN)
- valid_mask = ~np.isnan(valid_data)
- anomalies = np.zeros_like(valid_data, dtype=bool)
-
- if not np.any(valid_mask):
- print(f" 变量 {var_name} 无有效数据,跳过检测")
- return anomalies
-
- # 1. 超过X为异常(跨膜压差、段间压差、电导)
- if isinstance(thresholds, tuple) and thresholds[0] == "over":
- threshold_val = thresholds[1]
- anomalies[valid_mask] = valid_data[valid_mask] > threshold_val
-
- # 2. 不在X-Y为异常(pH值)
- elif isinstance(thresholds, tuple) and thresholds[0] == "not_range":
- low, high = thresholds[1], thresholds[2]
- anomalies[valid_mask] = (valid_data[valid_mask] < low) | (valid_data[valid_mask] > high)
-
- # 3. 大于X为异常(浊度)
- elif isinstance(thresholds, tuple) and thresholds[0] == "gt":
- threshold_val = thresholds[1]
- anomalies[valid_mask] = valid_data[valid_mask] > threshold_val
-
- # 4. 双范围处理(CIP清洗液PH)
- elif isinstance(thresholds, tuple) and len(thresholds) == 2 and all(isinstance(t, tuple) for t in thresholds):
- (a, b), (c, d) = thresholds
- normal = ((valid_data >= a) & (valid_data <= b)) | ((valid_data >= c) & (valid_data <= d))
- anomalies[valid_mask] = ~normal[valid_mask]
-
- # 5. 基准百分比范围(低于80%或110%-120%)
- elif thresholds == ("baseline_80_110_120",):
- baseline = np.percentile(valid_data[valid_mask], 50)
- low_threshold = baseline * 0.8
- high_low = baseline * 1.1
- high_high = baseline * 1.2
- anomalies[valid_mask] = (valid_data[valid_mask] < low_threshold) | \
- ((valid_data[valid_mask] >= high_low) & (valid_data[valid_mask] <= high_high))
-
- # 6. 百分比范围(±10%-20%)
- elif isinstance(thresholds, tuple) and thresholds[0] == "percent_range":
- percent = 0.1 if "±10%" in config["desc"] else 0.2
- mean_val = np.mean(valid_data[valid_mask])
- low = mean_val * (1 - percent)
- high = mean_val * (1 + percent)
- anomalies[valid_mask] = (valid_data[valid_mask] < low) | (valid_data[valid_mask] > high)
-
- # 7. 普通范围处理
- elif thresholds[0] is not None and thresholds[1] is not None:
- lower, upper = thresholds
- anomalies[valid_mask] = (valid_data[valid_mask] < lower) | (valid_data[valid_mask] > upper)
-
- return anomalies
- def load_raw_data(self):
- """加载所有CSV数据,仅保留有明确阈值的传感器"""
- file_paths = [os.path.join(self.data_dir, f"data_process_{i}.csv") for i in range(1, 105)]
- existing_files = [p for p in file_paths if os.path.exists(p)]
-
- if not existing_files:
- raise FileNotFoundError(f" 未在 {self.data_dir} 目录找到数据文件(需命名为data_process_1~104.csv)")
-
- print(f"找到 {len(existing_files)} 个数据文件,加载中...")
- for file in existing_files:
- try:
- df = pd.read_csv(file).iloc[:, 1:] # 剔除第一列时间列
- for var_name in self.sensor_config.keys():
- if var_name not in df.columns:
- continue
-
- # 合并原始数据
- raw_data = df[var_name].values
- if var_name not in self.results["raw_data"]:
- self.results["raw_data"][var_name] = []
- self.results["raw_data"][var_name].append(raw_data)
- except Exception as e:
- print(f" 加载文件 {os.path.basename(file)} 出错:{str(e)},跳过")
-
- # 转换为numpy数组
- for var_name in self.results["raw_data"]:
- self.results["raw_data"][var_name] = np.concatenate(self.results["raw_data"][var_name])
- def plot_anomalies(self, var_name):
- """画图:有效数据+异常数据+阈值线(适配所有新增传感器类型)"""
- valid_data = self.results["valid_data"][var_name]
- anomalies = self.results["anomalies"][var_name]
- config = self.sensor_config[var_name]
-
- # 生成时间轴(按数据点顺序,单位:小时)
- time = np.arange(len(valid_data)) / 60 # 假设1分钟1个数据点,转换为小时
-
- # 筛选有效数据和异常数据的索引
- valid_mask = ~np.isnan(valid_data)
- anomaly_mask = anomalies & valid_mask
-
- # 创建图表
- plt.figure(figsize=(12, 6))
-
- # 1. 绘制有效数据(蓝色实线)
- plt.plot(time[valid_mask], valid_data[valid_mask], 'b-', alpha=0.8, label='有效数据')
-
- # 2. 绘制异常数据(红色圆点,突出显示)- 不显示异常点个数
- if np.any(anomaly_mask):
- plt.scatter(time[anomaly_mask], valid_data[anomaly_mask],
- color='red', s=20, zorder=5, label='异常点') # 修改此处,去掉个数统计
-
- # 3. 绘制阈值线(根据阈值类型适配)
- thresholds = self._parse_threshold(config["desc"])
- if isinstance(thresholds, tuple):
- # 超过X为异常(红色虚线)
- if thresholds[0] == "over":
- threshold_val = thresholds[1]
- plt.axhline(y=threshold_val, color='red', linestyle='--', alpha=0.8,
- label=f'异常阈值:{threshold_val}(超过为异常)')
-
- # 不在X-Y为异常(绿色虚线:上下限)
- elif thresholds[0] == "not_range":
- low, high = thresholds[1], thresholds[2]
- plt.axhline(y=low, color='green', linestyle='--', alpha=0.8, label=f'正常下限:{low}')
- plt.axhline(y=high, color='green', linestyle='--', alpha=0.8, label=f'正常上限:{high}')
- plt.fill_between(time, low, high, alpha=0.1, color='green', label='正常范围')
-
- # 大于X为异常(红色虚线)
- elif thresholds[0] == "gt":
- threshold_val = thresholds[1]
- plt.axhline(y=threshold_val, color='red', linestyle='--', alpha=0.8,
- label=f'异常阈值:{threshold_val}(大于为异常)')
-
- # 双范围(CIP清洗液PH)
- elif len(thresholds) == 2 and all(isinstance(t, tuple) for t in thresholds):
- (a, b), (c, d) = thresholds
- plt.axhline(y=a, color='g', linestyle='--', alpha=0.6, label=f'酸下限:{a}')
- plt.axhline(y=b, color='g', linestyle='--', alpha=0.6, label=f'酸上限:{b}')
- plt.axhline(y=c, color='g', linestyle='--', alpha=0.6, label=f'碱下限:{c}')
- plt.axhline(y=d, color='g', linestyle='--', alpha=0.6, label=f'碱上限:{d}')
-
- # 普通范围
- elif thresholds[0] is not None and thresholds[1] is not None:
- lower, upper = thresholds
- if lower != float('-inf'):
- plt.axhline(y=lower, color='g', linestyle='--', alpha=0.6, label=f'正常下限:{lower}')
- if upper != float('inf'):
- plt.axhline(y=upper, color='g', linestyle='--', alpha=0.6, label=f'正常上限:{upper}')
-
- # 百分比范围
- elif thresholds[0] == "percent_range":
- percent = 0.1 if "±10%" in config["desc"] else 0.2
- mean_val = np.mean(valid_data[valid_mask])
- low = mean_val * (1 - percent)
- high = mean_val * (1 + percent)
- plt.axhline(y=low, color='g', linestyle='--', alpha=0.6, label=f'正常下限:{low:.2f}')
- plt.axhline(y=high, color='g', linestyle='--', alpha=0.6, label=f'正常上限:{high:.2f}')
-
- # 图表美化
- plt.title(f'{var_name}\n异常标准:{config["desc"]}', fontsize=12, pad=20)
- plt.xlabel('时间(小时)', fontsize=10)
- plt.ylabel('数值', fontsize=10)
- plt.legend(loc='upper right', fontsize=9)
- plt.grid(alpha=0.3)
- plt.tight_layout()
-
- # 保存图表
- fig_path = os.path.join(self.fig_dir, f'{var_name}_异常检测图.png')
- plt.savefig(fig_path, dpi=300, bbox_inches='tight', facecolor='white')
- plt.close()
- print(f" 图表保存:{fig_path}")
- def run_detection(self):
- """完整检测流程:加载数据→筛选有效数据→异常检测→指定传感器画图→生成报告"""
- # 1. 加载原始数据
- self.load_raw_data()
-
- # 2. 逐传感器处理
- processed_count = 0
- abnormal_count = 0
- plotted_count = 0 # 统计画图的传感器数量
- for var_name, config in self.sensor_config.items():
- if var_name not in self.results["raw_data"]:
- print(f"变量 {var_name} 无数据,跳过")
- continue
-
- print(f"\n 处理传感器:{var_name}")
- raw_data = self.results["raw_data"][var_name]
-
- # 3. 筛选有效数据
- valid_data = self.filter_valid_data(raw_data)
- self.results["valid_data"][var_name] = valid_data
- valid_count = np.sum(~np.isnan(valid_data))
- print(f" 有效数据量:{valid_count}/{len(raw_data)}")
-
- if valid_count == 0:
- continue
-
- # 4. 异常检测
- anomalies = self.rule_based_detect(var_name, valid_data)
- self.results["anomalies"][var_name] = anomalies
- processed_count += 1
-
- # 统计异常
- anomaly_count = np.sum(anomalies)
- anomaly_ratio = (anomaly_count / valid_count) * 100 if valid_count > 0 else 0.0
- if anomaly_count > 0:
- abnormal_count += 1
- print(f" 异常数据:{anomaly_count} 条(占比 {anomaly_ratio:.2f}%)")
- else:
- print(f" 无异常数据")
-
- # 5. 仅对白名单中的传感器画图(包含新增的4个水质传感器)
- if var_name in PLOT_WHITELIST:
- self.plot_anomalies(var_name)
- plotted_count += 1
-
- # 记录总结
- self.results["threshold_summary"].append({
- "传感器名称": var_name.split("@")[0] if "@" in var_name else var_name,
- "变量标识": var_name,
- "阈值来源": config["source"],
- "异常标准": config["desc"],
- "原始数据总量": len(raw_data),
- "有效数据量": valid_count,
- "异常数据量": anomaly_count,
- "异常比例(%)": round(anomaly_ratio, 2),
- "是否画图": "是" if var_name in PLOT_WHITELIST else "否"
- })
-
- # 6. 生成总结报告
- self._generate_summary_report(processed_count, abnormal_count, plotted_count)
- return self.results
- def _generate_summary_report(self, processed_count, abnormal_count, plotted_count):
- """生成检测总结报告"""
- summary_df = pd.DataFrame(self.results["threshold_summary"])
- summary_path = os.path.join(self.result_dir, "传感器异常检测总结报告.csv")
- summary_df.to_csv(summary_path, index=False, encoding="utf-8-sig")
-
- # 打印汇总信息
- total_sensors = len(self.sensor_config)
- print(f"\n" + "="*60)
- print(f" 检测总结")
- print(f"="*60)
- print(f"总传感器数量(有明确阈值):{total_sensors}")
- print(f"成功处理的传感器:{processed_count}")
- print(f"存在异常的传感器:{abnormal_count}")
- print(f"生成图表的传感器:{plotted_count}(压差类+电导类+水质类)")
- print(f" - 跨膜压差:4个")
- print(f" - 一段/二段压差:8个")
- print(f" - 电导类:5个")
- print(f" - 水质类(pH+浊度):4个")
- print(f"结果目录:{os.path.abspath(self.result_dir)}")
- print(f"图表目录:{os.path.abspath(self.fig_dir)}")
- print(f"总结报告:{os.path.abspath(summary_path)}")
- print(f"="*60)
- if __name__ == "__main__":
- try:
- # 初始化检测器并执行
- detector = DirectSensorAnomalyDetector(
- data_dir="datasets_xishan", # 数据目录(需自行确保存在)
- result_dir="direct_detection_results" # 结果保存目录
- )
- detector.run_detection()
- print("\n 检测完成!已生成压差、电导、水质传感器的异常检测图表")
- except Exception as e:
- print(f"\n 检测出错:{str(e)}")
|