data_preprocessor.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. # data_preprocessor.py
  2. import os
  3. import torch
  4. import joblib
  5. import numpy as np
  6. import pandas as pd
  7. from tqdm import tqdm
  8. from sklearn.preprocessing import MinMaxScaler
  9. from torch.utils.data import DataLoader, TensorDataset
  10. from concurrent.futures import ThreadPoolExecutor
  11. class DataPreprocessor:
  12. """数据预处理类"""
  13. # 定义必须保留的列
  14. COLUMNS_TO_KEEP = [
  15. 'index',
  16. "AR.1#UF_JSFLOW_O", # 1#UF进水流量
  17. "AR.2#UF_JSFLOW_O", # 2#UF进水流量
  18. "AR.1#RO_JSFLOW_O", # 1#RO进水流量
  19. "AR.2#RO_JSFLOW_O", # 2#RO进水流量
  20. "AR.1#UF_JSPRESS_O", # 1#UF进水压力
  21. "AR.2#UF_JSPRESS_O", # 2#UF进水压力
  22. "AR.1#RO_JSPRESS_O", # 1#RO进水压力
  23. "AR.2#RO_JSPRESS_O", # 2#RO进水压力
  24. "AR.1#RO_EDJSPRESS_O", # 1#RO二段进水压力
  25. "AR.1#RO_SDJSPRESS_O", # 1#RO三段进水压力
  26. "AR.2#RO_EDJSPRESS_O", # 2#RO二段进水压力
  27. "AR.2#RO_SDJSPRESS_O", # 2#RO三段进水压力
  28. "AR.ZJS_TEMP_O", # 进水温度
  29. "AR.ZJS_ZD_O", # UF进水浊度
  30. "AR.RO_JSDD_O", # RO进水电导
  31. "AR.RO_JSORP_O", # RO进水ORP
  32. "AR.RO_JSPH_O", # RO进水PH
  33. "AR.1#UF_V_FB_O", # 1#UF调节阀开度反馈
  34. "AR.2#UF_V_FB_O", # 2#UF调节阀开度反馈
  35. "AR.1#UFBWB_FRE_FB_O", # 1#UF反洗泵频率反馈
  36. "AR.2#UFBWB_FRE_FB_O", # 2#UF反洗泵频率反馈
  37. "AR.1#RODJB_FRE_FB_O", # 1#RO段间泵频率反馈
  38. "AR.1#ROGYB_FRE_FB_O", # 1#RO高压泵频率反馈
  39. "AR.1#RODJB_CZ_O", # 1#RO段间泵测振反馈
  40. "AR.1#ROGYB_CZ_O", # 1#RO高压泵测振反馈
  41. "AR.2#RODJB_CZ_O", # 2#RO段间泵测振反馈
  42. "AR.2#ROGYB_CZ_O", # 2#RO高压泵测振反馈
  43. "AR.ROGSB_FRE_FB_O", # RO供水泵频率反馈
  44. "AR.UFGSB_FRE_FB_O", # UF供水泵频率反馈
  45. "AR.V_UF1_TJV_KD_FB", # UF1调节阀开度反馈
  46. "AR.V_UF2_TJV_KD_FB", # UF2调节阀开度反馈
  47. "AR.CS_LEVEL_O", # RO产水箱液位
  48. "AR.UF_CSLEVEL_O", # UF产水箱液位
  49. "AR.UF1_SSD_KMYC", # UF1跨膜压差
  50. "AR.UF2_SSD_KMYC", # UF2跨膜压差
  51. "AR.RO1_2D_YC", # RO1二段压差
  52. "AR.PUBLIC_BY_REAL_1", # RO1三段压差
  53. "1#RO_CSFLOW", # 1#RO产水流量
  54. ]
  55. @staticmethod
  56. def load_and_process_data(args, data):
  57. """加载并处理数据,划分训练/验证/测试集"""
  58. # 处理日期
  59. data['date'] = pd.to_datetime(data['date'])
  60. time_interval = pd.Timedelta(minutes=(4 * args.resolution / 60))
  61. window_time_span = time_interval * (args.seq_len + 1)
  62. val_start_date = pd.to_datetime(args.val_start_date)
  63. test_start_date = pd.to_datetime(args.test_start_date)
  64. # 调整时间窗口
  65. adjusted_val_start = val_start_date - window_time_span
  66. adjusted_test_start = test_start_date - window_time_span
  67. train_mask = (data['date'] >= pd.to_datetime(args.train_start_date)) & \
  68. (data['date'] <= pd.to_datetime(args.train_end_date))
  69. val_mask = (data['date'] >= adjusted_val_start) & \
  70. (data['date'] <= pd.to_datetime(args.val_end_date))
  71. test_mask = (data['date'] >= adjusted_test_start) & \
  72. (data['date'] <= pd.to_datetime(args.test_end_date))
  73. train_data = data[train_mask].reset_index(drop=True)
  74. val_data = data[val_mask].reset_index(drop=True)
  75. test_data = data[test_mask].reset_index(drop=True)
  76. train_data = train_data.drop(columns=['date'])
  77. val_data = val_data.drop(columns=['date'])
  78. test_data = test_data.drop(columns=['date'])
  79. # 创建数据集
  80. train_supervised = DataPreprocessor.create_supervised_dataset(args, train_data, 1)
  81. val_supervised = DataPreprocessor.create_supervised_dataset(args, val_data, 1)
  82. test_supervised = DataPreprocessor.create_supervised_dataset(args, test_data, args.step_size)
  83. # 转换为DataLoader
  84. train_loader = DataPreprocessor.load_data(args, train_supervised, shuffle=True)
  85. val_loader = DataPreprocessor.load_data(args, val_supervised, shuffle=False)
  86. test_loader = DataPreprocessor.load_data(args, test_supervised, shuffle=False)
  87. return train_loader, val_loader, test_loader, data
  88. @staticmethod
  89. def read_and_combine_csv_files(args):
  90. """读取文件并进行特征筛选和预处理"""
  91. current_dir = os.path.dirname(__file__)
  92. parent_dir = os.path.dirname(current_dir)
  93. args.data_dir = os.path.join(parent_dir, args.data_dir)
  94. def read_file(file_count):
  95. file_name = args.file_pattern.format(file_count)
  96. file_path = os.path.join(args.data_dir, file_name)
  97. try:
  98. df = pd.read_csv(file_path)
  99. # 确保只读取需要的列,若列不存在则会报错提示
  100. return df[DataPreprocessor.COLUMNS_TO_KEEP]
  101. except KeyError as e:
  102. print(f"文件 {file_name} 中缺少列: {e}")
  103. raise
  104. file_indices = list(range(args.start_files, args.end_files + 1))
  105. max_workers = os.cpu_count()
  106. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  107. results = list(tqdm(executor.map(read_file, file_indices),
  108. total=len(file_indices),
  109. desc="正在读取文件"))
  110. all_data = pd.concat(results, ignore_index=True)
  111. # 确保列顺序一致
  112. all_data = all_data[DataPreprocessor.COLUMNS_TO_KEEP]
  113. # 下采样
  114. chunk = all_data.iloc[::args.resolution, :].reset_index(drop=True)
  115. # 处理特征
  116. chunk = DataPreprocessor.process_date(chunk, args)
  117. chunk = DataPreprocessor.scaler_data(chunk, args)
  118. return chunk
  119. @staticmethod
  120. def process_date(data, args):
  121. data = data.rename(columns={'index': 'date'})
  122. data['date'] = pd.to_datetime(data['date'])
  123. time_features = []
  124. # 固定生成分钟级和日级特征,保持与Predictor一致
  125. data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
  126. data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440)
  127. data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440)
  128. data['day_of_year'] = data['date'].dt.dayofyear
  129. data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
  130. data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
  131. time_features.extend(['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos'])
  132. data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True)
  133. other_columns = [col for col in data.columns if col not in ['date'] and col not in time_features]
  134. data = data[['date'] + time_features + other_columns]
  135. return data
  136. @staticmethod
  137. def scaler_data(data, args):
  138. date_col = data[['date']]
  139. data_to_scale = data.drop(columns=['date'])
  140. scaler = MinMaxScaler(feature_range=(0, 1))
  141. scaled_data = scaler.fit_transform(data_to_scale)
  142. joblib.dump(scaler, args.scaler_path)
  143. scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns)
  144. scaled_data = pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1)
  145. return scaled_data
  146. @staticmethod
  147. def create_supervised_dataset(args, data, step_size):
  148. data = pd.DataFrame(data)
  149. cols = []
  150. col_names = []
  151. feature_columns = data.columns.tolist()
  152. # 输入序列
  153. for col in feature_columns:
  154. for i in range(args.seq_len - 1, -1, -1):
  155. cols.append(data[[col]].shift(i))
  156. col_names.append(f"{col}(t-{i})")
  157. # 目标序列 (取最后labels_num列)
  158. target_columns = feature_columns[-args.labels_num:]
  159. for i in range(1, args.output_size + 1):
  160. for col in target_columns:
  161. cols.append(data[[col]].shift(-i))
  162. col_names.append(f"{col}(t+{i})")
  163. dataset = pd.concat(cols, axis=1)
  164. dataset.columns = col_names
  165. dataset = dataset.iloc[::step_size, :]
  166. dataset.dropna(inplace=True)
  167. return dataset
  168. @staticmethod
  169. def load_data(args, dataset, shuffle):
  170. input_length = args.seq_len
  171. n_features = args.feature_num
  172. labels_num = args.labels_num
  173. n_features_total = n_features * input_length
  174. n_labels_total = args.output_size * labels_num
  175. X = dataset.values[:, :n_features_total]
  176. y = dataset.values[:, n_features_total:n_features_total + n_labels_total]
  177. X = X.reshape(X.shape[0], input_length, n_features)
  178. X = torch.tensor(X, dtype=torch.float32).to(args.device)
  179. y = torch.tensor(y, dtype=torch.float32).to(args.device)
  180. dataset_tensor = TensorDataset(X, y)
  181. generator = torch.Generator()
  182. generator.manual_seed(args.random_seed)
  183. return DataLoader(dataset_tensor, batch_size=args.batch_size, shuffle=shuffle, generator=generator)