data_preprocessor.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. # data_preprocessor.py
  2. import os
  3. import torch
  4. import joblib
  5. import numpy as np
  6. import pandas as pd
  7. from tqdm import tqdm
  8. from sklearn.preprocessing import MinMaxScaler
  9. from torch.utils.data import DataLoader, TensorDataset
  10. from concurrent.futures import ThreadPoolExecutor
  11. class DataPreprocessor:
  12. """数据预处理类"""
  13. # 定义必须保留的列
  14. COLUMNS_TO_KEEP = [
  15. "index",
  16. "water_out", # 外供水流量
  17. "ns=3;s=AI_ROJSLL_OUT", # 进水流量反馈
  18. "ns=3;s=AI_UFCSLL_OUT", # UF产水流量反馈
  19. "ns=3;s=RO_1DJSLL_SSD", # SSD_Flow_1djs
  20. "ns=3;s=RO_2DJSLL_SSD", # SSD_Flow_2djs
  21. "ns=3;s=RO_NS_SSD", # SSD_Flow_ns
  22. "ns=3;s=AI_JYCSLL1_OUT", # 产水流量计1反馈
  23. "ns=3;s=AI_RODJYL_OUT", # 段间压力反馈
  24. "ns=3;s=AI_ROJSYL_OUT", # 进水压力反馈
  25. "ns=3;s=AI_UFCSYL_OUT", # UF产水压力反馈
  26. "ns=3;s=AI_JYCIPPH_OUT", # CIPph反馈
  27. "ns=3;s=AI_JYCSDD_OUT", # 外供水电导反馈
  28. "ns=3;s=AI_UFCSZD_OUT", # UF产水浊度反馈
  29. "ns=3;s=AI_ROCSDD_OUT", # 产水电导反馈
  30. "ns=3;s=AI_UFJSORP_OUT", # UF进水ORP反馈
  31. "ns=3;s=AI_UFJSPH_OUT", # UF进水ph反馈
  32. "ns=3;s=AI_UFJSYW_OUT", # UF进水温度反馈
  33. "ns=3;s=AI_JYROCSYW_OUT", # 反渗透产水液位计反馈
  34. "ns=3;s=AI_JYSYW_OUT", # 酸液位反馈
  35. "ns=3;s=AI_RODJB_FR_OUT", # RO段间泵频率反馈
  36. "ns=3;s=AI_ROGSB_FR_OUT", # RO供水泵频率反馈
  37. "ns=3;s=AI_ROGYB_FR_OUT", # RO高压泵频率反馈
  38. "ns=3;s=AI_UFFXB_FR_OUT", # UF反洗泵频率反馈
  39. "ns=3;s=AI_UFCSB_FR_OUT", # UF产水泵频率反馈
  40. "ns=3;s=UF_TMP", # SSD跨膜压差
  41. "ns=3;s=RO_CHA1YL_SSD", # SSD_PressCha1
  42. "ns=3;s=RO_CHA2YL_SSD", # SSD_PressCha2
  43. "ns=3;s=RO_ZCS_SSD", # SSD_Flow_zcs
  44. ]
  45. @staticmethod
  46. def load_and_process_data(args, data):
  47. """加载并处理数据,划分训练/验证/测试集"""
  48. # 处理日期
  49. data['date'] = pd.to_datetime(data['date'])
  50. time_interval = pd.Timedelta(minutes=(4 * args.resolution / 60))
  51. window_time_span = time_interval * (args.seq_len + 1)
  52. val_start_date = pd.to_datetime(args.val_start_date)
  53. test_start_date = pd.to_datetime(args.test_start_date)
  54. # 调整时间窗口
  55. adjusted_val_start = val_start_date - window_time_span
  56. adjusted_test_start = test_start_date - window_time_span
  57. train_mask = (data['date'] >= pd.to_datetime(args.train_start_date)) & \
  58. (data['date'] <= pd.to_datetime(args.train_end_date))
  59. val_mask = (data['date'] >= adjusted_val_start) & \
  60. (data['date'] <= pd.to_datetime(args.val_end_date))
  61. test_mask = (data['date'] >= adjusted_test_start) & \
  62. (data['date'] <= pd.to_datetime(args.test_end_date))
  63. train_data = data[train_mask].reset_index(drop=True)
  64. val_data = data[val_mask].reset_index(drop=True)
  65. test_data = data[test_mask].reset_index(drop=True)
  66. train_data = train_data.drop(columns=['date'])
  67. val_data = val_data.drop(columns=['date'])
  68. test_data = test_data.drop(columns=['date'])
  69. # 创建数据集
  70. train_supervised = DataPreprocessor.create_supervised_dataset(args, train_data, 1)
  71. val_supervised = DataPreprocessor.create_supervised_dataset(args, val_data, 1)
  72. test_supervised = DataPreprocessor.create_supervised_dataset(args, test_data, args.step_size)
  73. # 转换为DataLoader
  74. train_loader = DataPreprocessor.load_data(args, train_supervised, shuffle=True)
  75. val_loader = DataPreprocessor.load_data(args, val_supervised, shuffle=False)
  76. test_loader = DataPreprocessor.load_data(args, test_supervised, shuffle=False)
  77. return train_loader, val_loader, test_loader, data
  78. @staticmethod
  79. def read_and_combine_csv_files(args):
  80. """读取文件并进行特征筛选和预处理"""
  81. current_dir = os.path.dirname(__file__)
  82. parent_dir = os.path.dirname(current_dir)
  83. args.data_dir = os.path.join(parent_dir, args.data_dir)
  84. def read_file(file_count):
  85. file_name = args.file_pattern.format(file_count)
  86. file_path = os.path.join(args.data_dir, file_name)
  87. try:
  88. df = pd.read_csv(file_path)
  89. # 确保只读取需要的列,若列不存在则会报错提示
  90. return df[DataPreprocessor.COLUMNS_TO_KEEP]
  91. except KeyError as e:
  92. print(f"文件 {file_name} 中缺少列: {e}")
  93. raise
  94. file_indices = list(range(args.start_files, args.end_files + 1))
  95. max_workers = os.cpu_count()
  96. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  97. results = list(tqdm(executor.map(read_file, file_indices),
  98. total=len(file_indices),
  99. desc="正在读取文件"))
  100. all_data = pd.concat(results, ignore_index=True)
  101. # 确保列顺序一致
  102. all_data = all_data[DataPreprocessor.COLUMNS_TO_KEEP]
  103. # 下采样
  104. chunk = all_data.iloc[::args.resolution, :].reset_index(drop=True)
  105. # 处理特征
  106. chunk = DataPreprocessor.process_date(chunk, args)
  107. chunk = DataPreprocessor.scaler_data(chunk, args)
  108. return chunk
  109. @staticmethod
  110. def process_date(data, args):
  111. data = data.rename(columns={'index': 'date'})
  112. data['date'] = pd.to_datetime(data['date'])
  113. time_features = []
  114. # 固定生成分钟级和日级特征,保持与Predictor一致
  115. data['minute_of_day'] = data['date'].dt.hour * 60 + data['date'].dt.minute
  116. data['minute_sin'] = np.sin(2 * np.pi * data['minute_of_day'] / 1440)
  117. data['minute_cos'] = np.cos(2 * np.pi * data['minute_of_day'] / 1440)
  118. data['day_of_year'] = data['date'].dt.dayofyear
  119. data['day_year_sin'] = np.sin(2 * np.pi * data['day_of_year'] / 366)
  120. data['day_year_cos'] = np.cos(2 * np.pi * data['day_of_year'] / 366)
  121. time_features.extend(['minute_sin', 'minute_cos', 'day_year_sin', 'day_year_cos'])
  122. data.drop(columns=['minute_of_day', 'day_of_year'], inplace=True)
  123. other_columns = [col for col in data.columns if col not in ['date'] and col not in time_features]
  124. data = data[['date'] + time_features + other_columns]
  125. return data
  126. @staticmethod
  127. def scaler_data(data, args):
  128. date_col = data[['date']]
  129. data_to_scale = data.drop(columns=['date'])
  130. scaler = MinMaxScaler(feature_range=(0, 1))
  131. scaled_data = scaler.fit_transform(data_to_scale)
  132. joblib.dump(scaler, args.scaler_path)
  133. scaled_data = pd.DataFrame(scaled_data, columns=data_to_scale.columns)
  134. scaled_data = pd.concat([date_col.reset_index(drop=True), scaled_data], axis=1)
  135. return scaled_data
  136. @staticmethod
  137. def create_supervised_dataset(args, data, step_size):
  138. data = pd.DataFrame(data)
  139. cols = []
  140. col_names = []
  141. feature_columns = data.columns.tolist()
  142. # 输入序列
  143. for col in feature_columns:
  144. for i in range(args.seq_len - 1, -1, -1):
  145. cols.append(data[[col]].shift(i))
  146. col_names.append(f"{col}(t-{i})")
  147. # 目标序列 (取最后labels_num列)
  148. target_columns = feature_columns[-args.labels_num:]
  149. for i in range(1, args.output_size + 1):
  150. for col in target_columns:
  151. cols.append(data[[col]].shift(-i))
  152. col_names.append(f"{col}(t+{i})")
  153. dataset = pd.concat(cols, axis=1)
  154. dataset.columns = col_names
  155. dataset = dataset.iloc[::step_size, :]
  156. dataset.dropna(inplace=True)
  157. return dataset
  158. @staticmethod
  159. def load_data(args, dataset, shuffle):
  160. input_length = args.seq_len
  161. n_features = args.feature_num
  162. labels_num = args.labels_num
  163. n_features_total = n_features * input_length
  164. n_labels_total = args.output_size * labels_num
  165. X = dataset.values[:, :n_features_total]
  166. y = dataset.values[:, n_features_total:n_features_total + n_labels_total]
  167. X = X.reshape(X.shape[0], input_length, n_features)
  168. X = torch.tensor(X, dtype=torch.float32).to(args.device)
  169. y = torch.tensor(y, dtype=torch.float32).to(args.device)
  170. dataset_tensor = TensorDataset(X, y)
  171. generator = torch.Generator()
  172. generator.manual_seed(args.random_seed)
  173. return DataLoader(dataset_tensor, batch_size=args.batch_size, shuffle=shuffle, generator=generator)