| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- import os
- import pandas as pd
- import numpy as np
- import joblib
- from sklearn.preprocessing import MinMaxScaler
- from sklearn.ensemble import IsolationForest
- from sklearn.svm import OneClassSVM
- # 设置中文字体显示
- import matplotlib.pyplot as plt
- plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
- # 数据文件夹路径
- data_folder = "datasets_export_xishan"
- # 定义要读取的文件和对应的列
- file_info = {
- "data_export5_{}.csv": ["UF1Per", "UF2Per", "UF3Per", "UF4Per"],
- "data_export8_{}.csv": ["C.M.RO1_DB@DPT_1", "C.M.RO1_DB@DPT_2",
- "C.M.RO2_DB@DPT_1", "C.M.RO2_DB@DPT_2"],
- "data_export9_{}.csv": ["C.M.RO3_DB@DPT_1", "C.M.RO3_DB@DPT_2",
- "C.M.RO4_DB@DPT_1", "C.M.RO4_DB@DPT_2"],
- "data_export11_{}.csv": ["RO1_CSFlow", "RO2_CSFlow", "RO3_CSFlow", "RO4_CSFlow"]
- }
- def load_and_merge_data():
- """加载并合并所有数据文件"""
- all_data = []
-
- # 循环读取每个文件模板和对应的编号1-26
- for file_template, columns in file_info.items():
- for i in range(1, 27):
- # 构建完整的文件路径
- filename = file_template.format(i)
- file_path = os.path.join(data_folder, filename)
-
- try:
- # 读取CSV文件的指定列
- df = pd.read_csv(file_path, usecols=columns)
- all_data.append(df)
- print(f"成功读取: {filename}")
- except Exception as e:
- print(f"读取文件 {filename} 时出错: {e}")
-
- # 合并所有数据
- if not all_data:
- raise ValueError("没有成功读取任何数据文件")
-
- merged_df = pd.concat(all_data, ignore_index=True)
- print(f"数据合并完成,总样本数: {len(merged_df)}")
- return merged_df
- def normalize_data(df):
- """对数据进行归一化处理"""
- scaler = MinMaxScaler()
- scaled_data = scaler.fit_transform(df)
- scaled_df = pd.DataFrame(scaled_data, columns=df.columns)
-
- # 保存归一化器
- joblib.dump(scaler, "scaler.pkl")
- print("归一化器已保存为 scaler.pkl")
-
- return scaled_df, scaler
- class IsolationForestModel:
- """孤立森林异常检测模型"""
- def __init__(self):
- self.models = {} # 存储每列的模型
-
- def fit(self, df):
- """逐列训练孤立森林模型"""
- for column in df.columns:
- # 准备数据(需要二维数组)
- X = df[column].values.reshape(-1, 1)
- # 训练模型
- model = IsolationForest(n_estimators=100, contamination='auto', random_state=42)
- model.fit(X)
- self.models[column] = model
- print(f"已训练 {column} 的孤立森林模型")
- return self
-
- def predict(self, df):
- """预测异常值,-1表示异常,1表示正常"""
- results = pd.DataFrame()
- for column in df.columns:
- if column not in self.models:
- raise ValueError(f"没有 {column} 的模型,请先训练")
-
- X = df[column].values.reshape(-1, 1)
- results[column] = self.models[column].predict(X)
- return results
-
- def save(self, filename="isolation_forest_models.pkl"):
- """保存模型"""
- joblib.dump(self, filename)
- print(f"孤立森林模型已保存为 {filename}")
- class ThreeSigmaModel:
- """3σ异常检测模型"""
- def __init__(self):
- self.stats = {} # 存储每列的均值和标准差
-
- def fit(self, df):
- """计算每列的均值和标准差"""
- for column in df.columns:
- mean = df[column].mean()
- std = df[column].std()
- self.stats[column] = (mean, std)
- print(f"已计算 {column} 的3σ统计量")
- return self
-
- def predict(self, df, n_sigma=3):
- """预测异常值,-1表示异常,1表示正常"""
- results = pd.DataFrame()
- for column in df.columns:
- if column not in self.stats:
- raise ValueError(f"没有 {column} 的统计量,请先训练")
-
- mean, std = self.stats[column]
- # 计算上下限
- lower_bound = mean - n_sigma * std
- upper_bound = mean + n_sigma * std
-
- # 判断异常值
- is_outlier = (df[column] < lower_bound) | (df[column] > upper_bound)
- # 转换为-1(异常)和1(正常)
- results[column] = np.where(is_outlier, -1, 1)
- return results
-
- def save(self, filename="three_sigma_model.pkl"):
- """保存模型"""
- joblib.dump(self, filename)
- print(f"3σ模型已保存为 {filename}")
- '''
- class OneClassSVMModel:
- """One-Class SVM异常检测模型"""
- def __init__(self):
- self.models = {} # 存储每列的模型
-
- def fit(self, df):
- """逐列训练One-Class SVM模型"""
- for column in df.columns:
- # 准备数据(需要二维数组)
- X = df[column].values.reshape(-1, 1)
- # 训练模型
- model = OneClassSVM(nu=0.05, kernel='rbf', gamma='scale')
- model.fit(X)
- self.models[column] = model
- print(f"已训练 {column} 的One-Class SVM模型")
- return self
-
- def predict(self, df):
- """预测异常值,-1表示异常,1表示正常"""
- results = pd.DataFrame()
- for column in df.columns:
- if column not in self.models:
- raise ValueError(f"没有 {column} 的模型,请先训练")
-
- X = df[column].values.reshape(-1, 1)
- results[column] = self.models[column].predict(X)
- return results
-
- def save(self, filename="one_class_svm_models.pkl"):
- """保存模型"""
- joblib.dump(self, filename)
- print(f"One-Class SVM模型已保存为 {filename}")
- '''
- def main():
- # 1. 加载并合并数据
- print("开始加载数据...")
- merged_data = load_and_merge_data()
-
- # 2. 数据归一化
- print("\n开始数据归一化...")
- normalized_data, scaler = normalize_data(merged_data)
-
- # 3. 训练并保存孤立森林模型
- print("\n开始训练孤立森林模型...")
- if_model = IsolationForestModel()
- if_model.fit(normalized_data)
- if_model.save()
-
- # 4. 训练并保存3σ模型
- print("\n开始训练3σ模型...")
- ts_model = ThreeSigmaModel()
- ts_model.fit(normalized_data)
- ts_model.save()
-
- # 5. 训练并保存One-Class SVM模型
- print("\n开始训练One-Class SVM模型...")
- ocsvm_model = OneClassSVMModel()
- ocsvm_model.fit(normalized_data)
- ocsvm_model.save()
-
- print("\n所有模型训练和保存完成!")
-
- # 使用模型进行预测
- sample_data = normalized_data.sample(min(100, len(normalized_data))) # 随机取100个样本或全部样本(如果不足10个)
-
- # 孤立森林预测
- if_predictions = if_model.predict(sample_data)
- print("\n孤立森林预测结果(-1表示异常,1表示正常):")
- print(if_predictions)
-
- # 3σ预测
- ts_predictions = ts_model.predict(sample_data)
- print("\n3σ预测结果(-1表示异常,1表示正常):")
- print(ts_predictions)
-
- # One-Class SVM预测
- ocsvm_predictions = ocsvm_model.predict(sample_data)
- print("\nOne-Class SVM预测结果(-1表示异常,1表示正常):")
- print(ocsvm_predictions)
- if __name__ == "__main__":
- main()
|