import os import pandas as pd import numpy as np import joblib from sklearn.preprocessing import MinMaxScaler from sklearn.ensemble import IsolationForest from sklearn.svm import OneClassSVM # 设置中文字体显示 import matplotlib.pyplot as plt plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"] # 数据文件夹路径 data_folder = "datasets_export_xishan" # 定义要读取的文件和对应的列 file_info = { "data_export5_{}.csv": ["UF1Per", "UF2Per", "UF3Per", "UF4Per"], "data_export8_{}.csv": ["C.M.RO1_DB@DPT_1", "C.M.RO1_DB@DPT_2", "C.M.RO2_DB@DPT_1", "C.M.RO2_DB@DPT_2"], "data_export9_{}.csv": ["C.M.RO3_DB@DPT_1", "C.M.RO3_DB@DPT_2", "C.M.RO4_DB@DPT_1", "C.M.RO4_DB@DPT_2"], "data_export11_{}.csv": ["RO1_CSFlow", "RO2_CSFlow", "RO3_CSFlow", "RO4_CSFlow"] } def load_and_merge_data(): """加载并合并所有数据文件""" all_data = [] # 循环读取每个文件模板和对应的编号1-26 for file_template, columns in file_info.items(): for i in range(1, 27): # 构建完整的文件路径 filename = file_template.format(i) file_path = os.path.join(data_folder, filename) try: # 读取CSV文件的指定列 df = pd.read_csv(file_path, usecols=columns) all_data.append(df) print(f"成功读取: {filename}") except Exception as e: print(f"读取文件 {filename} 时出错: {e}") # 合并所有数据 if not all_data: raise ValueError("没有成功读取任何数据文件") merged_df = pd.concat(all_data, ignore_index=True) print(f"数据合并完成,总样本数: {len(merged_df)}") return merged_df def normalize_data(df): """对数据进行归一化处理""" scaler = MinMaxScaler() scaled_data = scaler.fit_transform(df) scaled_df = pd.DataFrame(scaled_data, columns=df.columns) # 保存归一化器 joblib.dump(scaler, "scaler.pkl") print("归一化器已保存为 scaler.pkl") return scaled_df, scaler class IsolationForestModel: """孤立森林异常检测模型""" def __init__(self): self.models = {} # 存储每列的模型 def fit(self, df): """逐列训练孤立森林模型""" for column in df.columns: # 准备数据(需要二维数组) X = df[column].values.reshape(-1, 1) # 训练模型 model = IsolationForest(n_estimators=100, contamination='auto', random_state=42) model.fit(X) self.models[column] = model print(f"已训练 {column} 的孤立森林模型") return self def predict(self, df): """预测异常值,-1表示异常,1表示正常""" results = pd.DataFrame() for column in df.columns: if column not in self.models: raise ValueError(f"没有 {column} 的模型,请先训练") X = df[column].values.reshape(-1, 1) results[column] = self.models[column].predict(X) return results def save(self, filename="isolation_forest_models.pkl"): """保存模型""" joblib.dump(self, filename) print(f"孤立森林模型已保存为 {filename}") class ThreeSigmaModel: """3σ异常检测模型""" def __init__(self): self.stats = {} # 存储每列的均值和标准差 def fit(self, df): """计算每列的均值和标准差""" for column in df.columns: mean = df[column].mean() std = df[column].std() self.stats[column] = (mean, std) print(f"已计算 {column} 的3σ统计量") return self def predict(self, df, n_sigma=3): """预测异常值,-1表示异常,1表示正常""" results = pd.DataFrame() for column in df.columns: if column not in self.stats: raise ValueError(f"没有 {column} 的统计量,请先训练") mean, std = self.stats[column] # 计算上下限 lower_bound = mean - n_sigma * std upper_bound = mean + n_sigma * std # 判断异常值 is_outlier = (df[column] < lower_bound) | (df[column] > upper_bound) # 转换为-1(异常)和1(正常) results[column] = np.where(is_outlier, -1, 1) return results def save(self, filename="three_sigma_model.pkl"): """保存模型""" joblib.dump(self, filename) print(f"3σ模型已保存为 {filename}") ''' class OneClassSVMModel: """One-Class SVM异常检测模型""" def __init__(self): self.models = {} # 存储每列的模型 def fit(self, df): """逐列训练One-Class SVM模型""" for column in df.columns: # 准备数据(需要二维数组) X = df[column].values.reshape(-1, 1) # 训练模型 model = OneClassSVM(nu=0.05, kernel='rbf', gamma='scale') model.fit(X) self.models[column] = model print(f"已训练 {column} 的One-Class SVM模型") return self def predict(self, df): """预测异常值,-1表示异常,1表示正常""" results = pd.DataFrame() for column in df.columns: if column not in self.models: raise ValueError(f"没有 {column} 的模型,请先训练") X = df[column].values.reshape(-1, 1) results[column] = self.models[column].predict(X) return results def save(self, filename="one_class_svm_models.pkl"): """保存模型""" joblib.dump(self, filename) print(f"One-Class SVM模型已保存为 {filename}") ''' def main(): # 1. 加载并合并数据 print("开始加载数据...") merged_data = load_and_merge_data() # 2. 数据归一化 print("\n开始数据归一化...") normalized_data, scaler = normalize_data(merged_data) # 3. 训练并保存孤立森林模型 print("\n开始训练孤立森林模型...") if_model = IsolationForestModel() if_model.fit(normalized_data) if_model.save() # 4. 训练并保存3σ模型 print("\n开始训练3σ模型...") ts_model = ThreeSigmaModel() ts_model.fit(normalized_data) ts_model.save() # 5. 训练并保存One-Class SVM模型 print("\n开始训练One-Class SVM模型...") ocsvm_model = OneClassSVMModel() ocsvm_model.fit(normalized_data) ocsvm_model.save() print("\n所有模型训练和保存完成!") # 使用模型进行预测 sample_data = normalized_data.sample(min(100, len(normalized_data))) # 随机取100个样本或全部样本(如果不足10个) # 孤立森林预测 if_predictions = if_model.predict(sample_data) print("\n孤立森林预测结果(-1表示异常,1表示正常):") print(if_predictions) # 3σ预测 ts_predictions = ts_model.predict(sample_data) print("\n3σ预测结果(-1表示异常,1表示正常):") print(ts_predictions) # One-Class SVM预测 ocsvm_predictions = ocsvm_model.predict(sample_data) print("\nOne-Class SVM预测结果(-1表示异常,1表示正常):") print(ocsvm_predictions) if __name__ == "__main__": main()