import os import pandas as pd import numpy as np import joblib from sklearn.preprocessing import MinMaxScaler from sklearn.ensemble import IsolationForest from sklearn.svm import OneClassSVM # 设置中文字体显示(用于本地可视化时中文不乱码) import matplotlib.pyplot as plt plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"] # 数据文件夹路径(批量 CSV 存放目录) data_folder = "datasets_export_xishan" # 定义要读取的文件模板与对应列名(逐批次 data_exportX_{i}.csv, i=1..26) # 关键字段含义(业务约定): # - UF1Per:UF1膜渗透率(UF1Per) # - C.M.RO1_DB@DPT_1:RO1一段压差 # - C.M.RO1_DB@DPT_2:RO1二段压差 # - RO1_CSFlow:RO1产水流量 file_info = { "data_export5_{}.csv": ["UF1Per", "UF2Per", "UF3Per", "UF4Per"], "data_export8_{}.csv": ["C.M.RO1_DB@DPT_1", "C.M.RO1_DB@DPT_2", "C.M.RO2_DB@DPT_1", "C.M.RO2_DB@DPT_2"], "data_export9_{}.csv": ["C.M.RO3_DB@DPT_1", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_1", "C.M.RO4_DB@DPT_2"], "data_export11_{}.csv": ["RO1_CSFlow", "RO2_CSFlow", "RO3_CSFlow", "RO4_CSFlow"] } def load_and_merge_data(): """加载并合并所有数据文件 - 按 file_info 中的模板逐列读取各批 CSV(i=1..26),仅选取关心的指标列 - 将成功读取的数据 DataFrame 纵向拼接(ignore_index=True) - 返回合并后的 DataFrame;若无任何成功数据则报错 """ all_data = [] # 循环读取每个文件模板和对应的编号1-26 for file_template, columns in file_info.items(): for i in range(1, 27): # 构建完整的文件路径 filename = file_template.format(i) file_path = os.path.join(data_folder, filename) try: # 读取CSV文件的指定列(仅保留关心指标,减小内存开销) df = pd.read_csv(file_path, usecols=columns) all_data.append(df) print(f"成功读取: {filename}") except Exception as e: print(f"读取文件 {filename} 时出错: {e}") # 合并所有数据(纵向堆叠) if not all_data: raise ValueError("没有成功读取任何数据文件") merged_df = pd.concat(all_data, ignore_index=True) print(f"数据合并完成,总样本数: {len(merged_df)}") return merged_df def normalize_data(df): """对数据进行归一化处理(逐列 Min-Max 到 [0,1]) - 拟合并转换每一列;保存 scaler 至 `scaler.pkl` - 返回归一化后的 DataFrame 以及 scaler(便于线上/后续反归一化) """ scaler = MinMaxScaler() scaled_data = scaler.fit_transform(df) scaled_df = pd.DataFrame(scaled_data, columns=df.columns) # 保存归一化器 joblib.dump(scaler, "scaler.pkl") print("归一化器已保存为 scaler.pkl") return scaled_df, scaler class IsolationForestModel: """孤立森林异常检测模型(逐列一维检测) - 特点:无需标注,适合检测孤立点;contamination='auto' 自动估计比例 - 输出:predict → -1 表示异常,1 表示正常 """ def __init__(self): self.models = {} # 存储每列的模型 def fit(self, df): """逐列训练孤立森林模型 参数: - df: 归一化后的 DataFrame(每列为一个监测指标) """ for column in df.columns: # 准备数据(sklearn 需要二维输入) X = df[column].values.reshape(-1, 1) # 训练模型 model = IsolationForest(n_estimators=100, contamination='auto', random_state=42) model.fit(X) self.models[column] = model print(f"已训练 {column} 的孤立森林模型") return self def predict(self, df): """预测异常值,-1 表示异常,1 表示正常(逐列)""" results = pd.DataFrame() for column in df.columns: if column not in self.models: raise ValueError(f"没有 {column} 的模型,请先训练") X = df[column].values.reshape(-1, 1) results[column] = self.models[column].predict(X) return results def save(self, filename="isolation_forest_models.pkl"): """保存模型""" joblib.dump(self, filename) print(f"孤立森林模型已保存为 {filename}") class ThreeSigmaModel: """3σ 异常检测模型(逐列基于均值±nσ 的阈值法) - 特点:简单、可解释;可调 n_sigma(默认 3) - 输出:-1 表示异常,1 表示正常 """ def __init__(self): self.stats = {} # 存储每列的均值和标准差 def fit(self, df): """计算每列的均值和标准差,并缓存统计量""" for column in df.columns: mean = df[column].mean() std = df[column].std() self.stats[column] = (mean, std) print(f"已计算 {column} 的3σ统计量") return self def predict(self, df, n_sigma=3): """预测异常值(-1=异常,1=正常) 参数: - df: 归一化后的 DataFrame - n_sigma: 阈值宽度因子,默认 3,即 mean ± 3*std """ results = pd.DataFrame() for column in df.columns: if column not in self.stats: raise ValueError(f"没有 {column} 的统计量,请先训练") mean, std = self.stats[column] # 计算上下限 lower_bound = mean - n_sigma * std upper_bound = mean + n_sigma * std # 判断异常值 is_outlier = (df[column] < lower_bound) | (df[column] > upper_bound) # 转换为-1(异常)和1(正常) results[column] = np.where(is_outlier, -1, 1) return results def save(self, filename="three_sigma_model.pkl"): """保存模型""" joblib.dump(self, filename) print(f"3σ模型已保存为 {filename}") ''' class OneClassSVMModel: """One-Class SVM 异常检测模型(可选) - 对复杂边界更有表达力,但对参数与尺度敏感 - 启用前请确保样本量与特征尺度合适 """ def __init__(self): self.models = {} # 存储每列的模型 def fit(self, df): """逐列训练 One-Class SVM 模型""" for column in df.columns: # 准备数据(需要二维数组) X = df[column].values.reshape(-1, 1) # 训练模型 model = OneClassSVM(nu=0.05, kernel='rbf', gamma='scale') model.fit(X) self.models[column] = model print(f"已训练 {column} 的One-Class SVM模型") return self def predict(self, df): """预测异常值,-1 表示异常,1 表示正常""" results = pd.DataFrame() for column in df.columns: if column not in self.models: raise ValueError(f"没有 {column} 的模型,请先训练") X = df[column].values.reshape(-1, 1) results[column] = self.models[column].predict(X) return results def save(self, filename="one_class_svm_models.pkl"): """保存模型""" joblib.dump(self, filename) print(f"One-Class SVM模型已保存为 {filename}") ''' def main(): # 1. 加载并合并数据(仅读取关心指标列) print("开始加载数据...") merged_data = load_and_merge_data() # 2. 数据归一化(逐列 Min-Max 到 [0,1] 并保存 scaler) print("\n开始数据归一化...") normalized_data, scaler = normalize_data(merged_data) # 3. 训练并保存孤立森林模型(逐列训练) print("\n开始训练孤立森林模型...") if_model = IsolationForestModel() if_model.fit(normalized_data) if_model.save() # 4. 训练并保存3σ模型(逐列计算统计量) print("\n开始训练3σ模型...") ts_model = ThreeSigmaModel() ts_model.fit(normalized_data) ts_model.save() # 5. 训练并保存 One-Class SVM 模型(可选,默认已注释) # 如需启用,请取消下方注释和类定义(第166-203行)的注释 # print("\n开始训练One-Class SVM模型...") # ocsvm_model = OneClassSVMModel() # ocsvm_model.fit(normalized_data) # ocsvm_model.save() print("\n所有模型训练和保存完成!") # 使用模型进行预测(示例:随机抽样 100 条) sample_data = normalized_data.sample(min(100, len(normalized_data))) # 随机取100个样本或全部样本(如果不足100个) # 孤立森林预测 if_predictions = if_model.predict(sample_data) print("\n孤立森林预测结果(-1表示异常,1表示正常):") print(if_predictions) # 3σ预测 ts_predictions = ts_model.predict(sample_data) print("\n3σ预测结果(-1表示异常,1表示正常):") print(ts_predictions) # One-Class SVM预测(可选,默认已注释) # 如需启用,请取消下方注释 # ocsvm_predictions = ocsvm_model.predict(sample_data) # print("\nOne-Class SVM预测结果(-1表示异常,1表示正常):") # print(ocsvm_predictions) if __name__ == "__main__": main()