# data_preprocessor.py import os import torch import joblib import numpy as np import pandas as pd from tqdm import tqdm from sklearn.preprocessing import MinMaxScaler from torch.utils.data import DataLoader, TensorDataset from concurrent.futures import ThreadPoolExecutor class DataPreprocessor: """数据预处理类""" # 定义必须保留的列 COLUMNS_TO_KEEP = [ 'index', "water_in", # 进水量 "water_out", # 外供水流量 "RO1_TYL", # RO1脱盐率 "RO2_TYL", # RO2脱盐率 "UF1Per", # UF1渗透率 "UF2Per", # UF2渗透率 "2#RODJB_Eff", # 2#RO段间泵效率 "1#RODJB_Eff", # 1#RO段间泵效率 "2#ROGYB_Eff", # 2#RO高压泵效率 "1#ROGYB_Eff", # 1#RO高压泵效率 "ROHSL", # 反渗透回收率 "ns=3;s=1#RO_CSDD_O", # 1#RO产水电导 "ns=3;s=1#RO_CSPRESS_O", # 1#RO产水压力 "ns=3;s=1#RO_EDCSFLOW_O", # 1#RO二段产水流量 "ns=3;s=1#RO_EDJSPRESS_O", # 1#RO二段进水压力 "ns=3;s=1#RO_EDNSPRESS_O", # 1#RO二段浓水压力 "ns=3;s=1#RO_JSFLOW_O", # 1#RO进水流量 "ns=3;s=1#RO_JSPRESS_O", # 1#RO进水压力 "ns=3;s=1#RO_NSFLOW_O", # 1#RO浓水流量 "ns=3;s=1#RO_SDCSFLOW_O", # 1#RO三段产水流量 "ns=3;s=1#RO_SDJSPRESS_O", # 1#RO三段进水压力 "ns=3;s=1#RO_SDNSPRESS_O", # 1#RO三段浓水压力 "ns=3;s=1#RODJB_CUR_FB_O", # 1#RO段间泵电流反馈 "ns=3;s=1#RODJB_CZ_O", # 1#RO段间泵测振反馈 "ns=3;s=1#RODJB_FRE_FB_O", # 1#RO段间泵频率反馈 "ns=3;s=1#ROGYB_CUR_FB_O", # 1#RO高压泵电流反馈 "ns=3;s=1#ROGYB_CZ_O", # 1#RO高压泵测振反馈 "ns=3;s=1#ROGYB_FRE_FB_O", # 1#RO高压泵频率反馈 "ns=3;s=1#UF_CSPRESS_O", # 1#UF产水压力 "ns=3;s=1#UF_JSFLOW_O", # 1#UF进水流量 "ns=3;s=1#UF_JSPRESS_O", # 1#UF进水压力 "ns=3;s=1#UF_V_FB_O", # 1#UF调节阀开度反馈 "ns=3;s=1#UFBWB_CUR_FB_O", # 1#UF反洗泵电流反馈 "ns=3;s=1#UFBWB_FRE_FB_O", # 1#UF反洗泵频率反馈 "ns=3;s=2#RO_CSDD_O", # 2#RO产水电导 "ns=3;s=2#RO_CSPRESS_O", # 2#RO产水压力 "ns=3;s=2#RO_EDCSFLOW_O", # 2#RO二段产水流量 "ns=3;s=2#RO_EDJSPRESS_O", # 2#RO二段进水压力 "ns=3;s=2#RO_EDNSPRESS_O", # 2#RO二段浓水压力 "ns=3;s=2#RO_JSFLOW_O", # 2#RO进水流量 "ns=3;s=2#RO_JSPRESS_O", # 2#RO进水压力 "ns=3;s=2#RO_NSFLOW_O", # 2#RO浓水流量 "ns=3;s=2#RO_SDCSFLOW_O", # 2#RO三段产水流量 "ns=3;s=2#RO_SDJSPRESS_O", # 2#RO三段进水压力 "ns=3;s=2#RO_SDNSPRESS_O", # 2#RO三段浓水压力 "ns=3;s=2#RODJB_CUR_FB_O", # 2#RO段间泵电流反馈 "ns=3;s=2#RODJB_CZ_O", # 2#RO段间泵测振反馈 "ns=3;s=2#RODJB_FRE_FB_O", # 2#RO段间泵频率反馈 "ns=3;s=2#ROGYB_CUR_FB_O", # 2#RO高压泵电流反馈 "ns=3;s=2#ROGYB_CZ_O", # 2#RO高压泵测振反馈 "ns=3;s=2#ROGYB_FRE_FB_O", # 2#RO高压泵频率反馈 "ns=3;s=2#UF_CSPRESS_O", # 2#UF产水压力 "ns=3;s=2#UF_JSFLOW_O", # 2#UF进水流量 "ns=3;s=2#UF_JSPRESS_O", # 2#UF进水压力 "ns=3;s=2#UF_V_FB_O", # 2#UF调节阀开度反馈 "ns=3;s=2#UFBWB_CUR_FB_O", # 2#UF反洗泵电流反馈 "ns=3;s=2#UFBWB_FRE_FB_O", # 2#UF反洗泵频率反馈 "ns=3;s=RO_JSDD_O", # RO进水电导 "ns=3;s=RO_JSORP_O", # RO进水ORP "ns=3;s=RO_JSPH_O", # RO进水PH "ns=3;s=RO1_1DUAN_CS_FLOW", # RO1一段产水流量 "ns=3;s=ZJS_PRESS_O", # 进水压力 "ns=3;s=ZJS_TEMP_O", # 进水温度 "ns=3;s=ZJS_ZD_O", # UF进水浊度 "ns=3;s=PUBLIC_RO1_MTL", # RO1膜通量 "ns=3;s=PUBLIC_RO2_MTL", # RO2膜通量 "ns=3;s=UF1_SSD_KMYC", # UF1跨膜压差 "ns=3;s=UF2_SSD_KMYC", # UF2跨膜压差 "ns=3;s=RO1_1D_YC", # RO1一段压差 "ns=3;s=RO1_2D_YC", # RO1二段压差 "ns=3;s=RO2_1D_YC", # RO2一段压差 "ns=3;s=RO2_2D_YC", # RO2二段压差 "ns=3;s=PUBLIC_BY_REAL_1", # RO1三段压差 "ns=3;s=PUBLIC_BY_REAL_2", # RO2三段压差 ] @staticmethod def load_and_process_data(args, data): """加载并处理数据,划分训练/验证/测试集""" # 处理日期 data['date'] = pd.to_datetime(data['date']) time_interval = pd.Timedelta(minutes=(4 * args.resolution / 60)) window_time_span = time_interval * (args.seq_len + 1) val_start_date = pd.to_datetime(args.val_start_date) test_start_date = pd.to_datetime(args.test_start_date) # 调整时间窗口 adjusted_val_start = val_start_date - window_time_span adjusted_test_start = test_start_date - window_time_span train_mask = (data['date'] >= pd.to_datetime(args.train_start_date)) & \ (data['date'] <= pd.to_datetime(args.train_end_date)) val_mask = (data['date'] >= adjusted_val_start) & \ (data['date'] <= pd.to_datetime(args.val_end_date)) test_mask = (data['date'] >= adjusted_test_start) & \ (data['date'] <= pd.to_datetime(args.test_end_date)) train_data = data[train_mask].reset_index(drop=True) val_data = data[val_mask].reset_index(drop=True) test_data = data[test_mask].reset_index(drop=True) train_data = train_data.drop(columns=['date']) val_data = val_data.drop(columns=['date']) test_data = test_data.drop(columns=['date']) # 创建数据集 train_supervised = DataPreprocessor.create_supervised_dataset(args, train_data, 1) val_supervised = DataPreprocessor.create_supervised_dataset(args, val_data, 1) test_supervised = DataPreprocessor.create_supervised_dataset(args, test_data, args.step_size) # 转换为DataLoader train_loader = DataPreprocessor.load_data(args, train_supervised, shuffle=True) val_loader = DataPreprocessor.load_data(args, val_supervised, shuffle=False) test_loader = DataPreprocessor.load_data(args, test_supervised, shuffle=False) return train_loader, val_loader, test_loader, data @staticmethod def read_and_combine_csv_files(args): """读取文件并进行特征筛选和预处理""" current_dir = os.path.dirname(__file__) parent_dir = os.path.dirname(current_dir) args.data_dir = os.path.join(parent_dir, args.data_dir) def read_file(file_count): file_name = args.file_pattern.format(file_count) file_path = os.path.join(args.data_dir, file_name) try: df = pd.read_csv(file_path) # 确保只读取需要的列,若列不存在则会报错提示 return df[DataPreprocessor.COLUMNS_TO_KEEP] except KeyError as e: print(f"文件 {file_name} 中缺少列: {e}") raise file_indices = list(range(args.start_files, args.end_files + 1)) max_workers = os.cpu_count() with ThreadPoolExecutor(max_workers=max_workers) as executor: results = list(tqdm(executor.map(read_file, file_indices), total=len(file_indices), desc="正在读取文件")) all_data = pd.concat(results, ignore_index=True) # 确保列顺序一致 all_data = all_data[DataPreprocessor.COLUMNS_TO_KEEP] # 下采样 chunk = all_data.iloc[::args.resolution, :].reset_index(drop=True) # 处理特征 chunk = DataPreprocessor.process_date(chunk, args) chunk = DataPreprocessor.scaler_data(chunk, args) return chunk @staticmethod def process_date(data, args): data = data.rename(columns={'index': 'date'}) data['date'] = pd.to_datetime(data['date']) time_features = [] # 固定生成分钟级和日级特征,保持与Predictor一致 data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440) data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440) data['day_of_year'] = data['date'].dt.dayofyear data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366) data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366) time_features.extend(['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos']) data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True) other_columns = [col for col in data.columns if col not in ['date'] and col not in time_features] data = data[['date'] + time_features + other_columns] return data @staticmethod def scaler_data(data, args): date_col = data[['date']] data_to_scale = data.drop(columns=['date']) scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data = scaler.fit_transform(data_to_scale) joblib.dump(scaler, args.scaler_path) scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns) scaled_data = pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1) return scaled_data @staticmethod def create_supervised_dataset(args, data, step_size): data = pd.DataFrame(data) cols = [] col_names = [] feature_columns = data.columns.tolist() # 输入序列 for col in feature_columns: for i in range(args.seq_len - 1, -1, -1): cols.append(data[[col]].shift(i)) col_names.append(f"{col}(t-{i})") # 目标序列 (取最后labels_num列) target_columns = feature_columns[-args.labels_num:] for i in range(1, args.output_size + 1): for col in target_columns: cols.append(data[[col]].shift(-i)) col_names.append(f"{col}(t+{i})") dataset = pd.concat(cols, axis=1) dataset.columns = col_names dataset = dataset.iloc[::step_size, :] dataset.dropna(inplace=True) return dataset @staticmethod def load_data(args, dataset, shuffle): input_length = args.seq_len n_features = args.feature_num labels_num = args.labels_num n_features_total = n_features * input_length n_labels_total = args.output_size * labels_num X = dataset.values[:, :n_features_total] y = dataset.values[:, n_features_total:n_features_total + n_labels_total] X = X.reshape(X.shape[0], input_length, n_features) X = torch.tensor(X, dtype=torch.float32).to(args.device) y = torch.tensor(y, dtype=torch.float32).to(args.device) dataset_tensor = TensorDataset(X, y) generator = torch.Generator() generator.manual_seed(args.random_seed) return DataLoader(dataset_tensor, batch_size=args.batch_size, shuffle=shuffle, generator=generator)