data_processing.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. # -*- coding: utf-8 -*-
  2. """data_processing.py: 第一层 - 异常量化表征 """
  3. import pandas as pd
  4. import numpy as np
  5. import os
  6. from tqdm import tqdm
  7. from config import config
  8. import gc
  9. from joblib import Parallel, delayed
  10. import multiprocessing
  11. def _process_single_file_task(file_idx, file_path, sensor_list, target_interval):
  12. """
  13. 单个文件的读取与降采样任务(运行在子进程中)
  14. """
  15. if not os.path.exists(file_path):
  16. return None
  17. try:
  18. # 1. 快速读取
  19. df_temp = pd.read_csv(file_path, index_col=None, low_memory=False)
  20. if df_temp.empty:
  21. return None
  22. # 2. 时间解析 (mixed格式)
  23. time_col = df_temp.columns[0]
  24. df_temp[time_col] = pd.to_datetime(
  25. df_temp[time_col],
  26. format='mixed',
  27. errors='coerce'
  28. )
  29. df_temp = df_temp.dropna(subset=[time_col]).set_index(time_col)
  30. # 3. 筛选列
  31. valid_cols = [c for c in df_temp.columns if c in sensor_list]
  32. if not valid_cols:
  33. return None
  34. df_valid = df_temp[valid_cols]
  35. # 4. 立即降采样 (4s -> 20s) & 类型转换
  36. df_resampled = df_valid.resample(f"{target_interval}s").mean()
  37. df_resampled = df_resampled.astype(np.float32)
  38. return df_resampled
  39. except Exception as e:
  40. print(f"文件 {file_idx} 处理出错: {e}")
  41. return None
  42. def _calculate_window_chunk(start_indices, values, win_len, valid_ratio, threshold_val=0.95):
  43. """
  44. 窗口计算任务块(运行在子进程中)
  45. 处理一批窗口的 nanquantile 计算
  46. """
  47. chunk_results = []
  48. for start in start_indices:
  49. win_data = values[start : start + win_len, :]
  50. # 向量化计算有效性
  51. # axis=0 沿时间轴统计
  52. valid_counts = np.sum(~np.isnan(win_data), axis=0)
  53. valid_ratios = valid_counts / win_len
  54. win_res = np.zeros(win_data.shape[1], dtype=np.float32)
  55. valid_mask = valid_ratios >= valid_ratio
  56. if np.any(valid_mask):
  57. # 并行化
  58. quantile_scores = np.nanquantile(win_data[:, valid_mask], threshold_val, axis=0)
  59. win_res[valid_mask] = quantile_scores
  60. chunk_results.append(np.clip(win_res, 0, 1))
  61. return chunk_results
  62. class DataAnomalyProcessor:
  63. def __init__(self):
  64. self.threshold_path = os.path.join(config.BASE_DIR, config.THRESHOLD_FILENAME)
  65. self.threshold_df = self._load_thresholds()
  66. self.sensor_list = self.threshold_df['ID'].tolist()
  67. self.threshold_dict = self.threshold_df.set_index('ID').to_dict('index')
  68. # 根据CPU核心数设定并行度
  69. self.n_jobs_io = 24
  70. self.n_jobs_cpu = 64
  71. def _load_thresholds(self):
  72. """加载阈值表"""
  73. try:
  74. df = pd.read_excel(self.threshold_path)
  75. except Exception as e:
  76. raise ValueError(f"读取阈值表失败: {e}")
  77. required_cols = ['ID', 'Direction', 'Hard_min', 'Hard_max', 'Good_min', 'Good_max']
  78. for col in required_cols:
  79. if col not in df.columns:
  80. for c in df.columns:
  81. if c.lower() == col.lower():
  82. df.rename(columns={c: col}, inplace=True)
  83. break
  84. cols_to_numeric = ['Hard_min', 'Hard_max', 'Good_min', 'Good_max', 'Alarm_time']
  85. for c in cols_to_numeric:
  86. if c in df.columns:
  87. df[c] = pd.to_numeric(df[c], errors='coerce')
  88. df['Hard_min'] = df['Hard_min'].fillna(-np.inf)
  89. df['Hard_max'] = df['Hard_max'].fillna(np.inf)
  90. df['Good_min'] = df['Good_min'].fillna(-np.inf)
  91. df['Good_max'] = df['Good_max'].fillna(np.inf)
  92. df['Alarm_time'] = df['Alarm_time'].fillna(60)
  93. return df
  94. def _calculate_point_score_vectorized(self, series, sensor_name):
  95. # 向量化计算逻辑
  96. if sensor_name not in self.threshold_dict:
  97. return pd.Series(0.0, index=series.index, dtype=np.float32)
  98. info = self.threshold_dict[sensor_name]
  99. vals = series.astype(np.float32).copy()
  100. # 1. 硬阈值掩码(在硬阈值外的数据视为无效/缺失)
  101. mask_invalid = (vals < info['Hard_min']) | (vals > info['Hard_max'])
  102. vals[mask_invalid] = np.nan
  103. # 如果全部为 NaN,直接返回 0
  104. if vals.isna().all():
  105. return pd.Series(0.0, index=vals.index, dtype=np.float32)
  106. # ==================== (A) 计算绝对阈值得分 ====================
  107. abs_scores = pd.Series(0.0, index=vals.index, dtype=np.float32)
  108. direction = str(info['Direction']).strip().lower()
  109. if direction in ['low', 'both']:
  110. mask_low = (vals < info['Good_min']) & (vals >= info['Hard_min'])
  111. denom = info['Good_min'] - info['Hard_min']
  112. if denom > 1e-5:
  113. abs_scores[mask_low] = (info['Good_min'] - vals[mask_low]) / denom
  114. else:
  115. abs_scores[mask_low] = 1.0
  116. if direction in ['high', 'both']:
  117. mask_high = (vals > info['Good_max']) & (vals <= info['Hard_max'])
  118. denom = info['Hard_max'] - info['Good_max']
  119. if denom > 1e-5:
  120. abs_scores[mask_high] = (vals[mask_high] - info['Good_max']) / denom
  121. else:
  122. abs_scores[mask_high] = 1.0
  123. alarm_t = max(info['Alarm_time'], 1.0)
  124. time_weight = 1.0 + (30.0 / alarm_t)
  125. abs_scores = (abs_scores * time_weight).clip(0, 1)
  126. # ==================== (B) 计算动态 MAD 得分 ====================
  127. # 使用 rolling 计算滚动窗口内的中位数 (Median)
  128. rolling_median = vals.rolling(
  129. window=config.MAD_HISTORY_WINDOW,
  130. min_periods=1
  131. ).median()
  132. # 计算绝对偏差 |x_i - Median|
  133. abs_deviation = (vals - rolling_median).abs()
  134. # 计算 MAD = Median(|x_i - Median|)
  135. rolling_mad = abs_deviation.rolling(
  136. window=config.MAD_HISTORY_WINDOW,
  137. min_periods=1
  138. ).median()
  139. # 计算动态得分:将偏差映射到 0~1 的区间,除以 (MAD_THRESHOLD * MAD)
  140. # 加 1e-5 防止除以 0 导致溢出
  141. dyn_scores = (abs_deviation / (rolling_mad * config.MAD_THRESHOLD + 1e-5)).clip(0, 1)
  142. # ==================== (C) 综合加权得分 ====================
  143. final_scores = (config.ABSOLUTE_SCORE_WEIGHT * abs_scores) + (config.DYNAMIC_SCORE_WEIGHT * dyn_scores)
  144. # 恢复无效数据的 NaN 状态
  145. final_scores[mask_invalid] = np.nan
  146. return final_scores
  147. def process(self):
  148. print(f">>> [Step 1] 数据处理启动 | 检测到 CPU 核心数: {multiprocessing.cpu_count()}")
  149. # 1. 并行读取与降采样
  150. file_indices = list(range(config.SENSOR_FILE_NUM_RANGE[0], config.SENSOR_FILE_NUM_RANGE[1] + 1))
  151. file_paths = [os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}{i}.csv") for i in file_indices]
  152. print(f"正在并行读取 {len(file_indices)} 个文件 (并发数: {self.n_jobs_io})...")
  153. # 使用 joblib 并行执行 _process_single_file_task
  154. results = Parallel(n_jobs=self.n_jobs_io, backend="loky", verbose=5)(
  155. delayed(_process_single_file_task)(
  156. idx, path, self.sensor_list, config.TARGET_SAMPLE_INTERVAL
  157. ) for idx, path in zip(file_indices, file_paths)
  158. )
  159. # 过滤掉 None 结果
  160. all_series_list = [r for r in results if r is not None]
  161. if not all_series_list:
  162. raise ValueError("没有读取到有效数据")
  163. # 2. 合并数据
  164. print("正在纵向合并数据...")
  165. resampled_df = pd.concat(all_series_list, axis=0).sort_index()
  166. resampled_df = resampled_df[~resampled_df.index.duplicated(keep='first')]
  167. # 释放内存
  168. del all_series_list, results
  169. gc.collect()
  170. # 3. 计算点级得分
  171. print("计算点级异常得分...")
  172. scores_dict = {}
  173. for sensor in tqdm(self.sensor_list, desc="计算得分"):
  174. if sensor in resampled_df.columns:
  175. scores_dict[sensor] = self._calculate_point_score_vectorized(resampled_df[sensor], sensor)
  176. else:
  177. scores_dict[sensor] = pd.Series(np.nan, index=resampled_df.index, dtype=np.float32)
  178. point_score_df = pd.DataFrame(scores_dict)
  179. point_score_df = point_score_df[self.sensor_list] # 对齐列顺序
  180. # 4. 并行窗口聚合 (计算密集型)
  181. print(f"正在并行计算窗口分位数 (并发数: {self.n_jobs_cpu})...")
  182. values = point_score_df.values
  183. win_len = config.POINTS_PER_WINDOW
  184. step = config.WINDOW_STEP
  185. total_len = len(values)
  186. if total_len < win_len:
  187. return np.array([]), np.array([]), self.threshold_df
  188. # 生成所有窗口的起始索引
  189. all_start_indices = list(range(0, total_len - win_len, step))
  190. # 将索引切分为多个 Chunk,每个 Chunk 分配给一个 CPU 核心
  191. num_chunks = self.n_jobs_cpu * 4
  192. chunk_size = max(1, len(all_start_indices) // num_chunks)
  193. chunks = [all_start_indices[i:i + chunk_size] for i in range(0, len(all_start_indices), chunk_size)]
  194. # 并行计算
  195. chunk_results = Parallel(n_jobs=self.n_jobs_cpu, backend="loky", verbose=5)(
  196. delayed(_calculate_window_chunk)(
  197. chunk, values, win_len, config.VALID_DATA_RATIO
  198. ) for chunk in chunks
  199. )
  200. # 合并结果
  201. window_scores = [item for sublist in chunk_results for item in sublist]
  202. final_scores = np.nan_to_num(np.array(window_scores, dtype=np.float32), nan=0.0)
  203. # 5. 切分数据集
  204. total_samples = final_scores.shape[0]
  205. if total_samples == 0:
  206. raise ValueError("窗口数量为0")
  207. split_idx = int(total_samples * config.TRAIN_TEST_SPLIT)
  208. train_scores = final_scores[:split_idx]
  209. test_scores = final_scores[split_idx:]
  210. print(f"处理完成: 训练集 {len(train_scores)} | 测试集 {len(test_scores)}")
  211. return train_scores, test_scores, self.threshold_df