data_processing.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. # -*- coding: utf-8 -*-
  2. """
  3. data_processing.py: 第一层 - 异常量化表征
  4. 该模块负责将海量、原始、可能有缺失值的传感器时序数据,
  5. 转化为系统可理解的、标准化的 0~1 异常得分矩阵。
  6. 包含数据降采样、双重异常评估、滑动窗口聚合等核心逻辑。
  7. """
  8. import pandas as pd
  9. import numpy as np
  10. import os
  11. from tqdm import tqdm
  12. from config import config
  13. import gc
  14. from joblib import Parallel, delayed
  15. import multiprocessing
  16. def _process_single_file_task(file_idx, file_path, sensor_list, target_interval):
  17. """
  18. 单个文件的读取与降采样任务(运行在子进程中)
  19. """
  20. if not os.path.exists(file_path):
  21. return None
  22. try:
  23. # 1. 快速读取
  24. df_temp = pd.read_csv(file_path, index_col=None, low_memory=False)
  25. if df_temp.empty:
  26. return None
  27. # 2. 时间解析 (mixed格式)
  28. time_col = df_temp.columns[0]
  29. df_temp[time_col] = pd.to_datetime(
  30. df_temp[time_col],
  31. format='mixed',
  32. errors='coerce'
  33. )
  34. df_temp = df_temp.dropna(subset=[time_col]).set_index(time_col)
  35. # 3. 筛选列
  36. valid_cols = [c for c in df_temp.columns if c in sensor_list]
  37. if not valid_cols:
  38. return None
  39. df_valid = df_temp[valid_cols]
  40. # 4. 立即降采样 (4s -> 20s) & 类型转换
  41. df_resampled = df_valid.resample(f"{target_interval}s").mean()
  42. df_resampled = df_resampled.astype(np.float32)
  43. return df_resampled
  44. except Exception as e:
  45. print(f"文件 {file_idx} 处理出错: {e}")
  46. return None
  47. def _calculate_window_chunk(start_indices, values, win_len, valid_ratio, threshold_val=0.95):
  48. """
  49. 窗口计算任务块(运行在子进程中)
  50. 将连续的时序点打包成窗口(例如 40分钟=120个点),计算该窗口内的综合异常程度。
  51. """
  52. chunk_results = []
  53. for start in start_indices:
  54. win_data = values[start : start + win_len, :]
  55. # 向量化计算窗口内的数据有效性(非 NaN 数据的比例)
  56. valid_counts = np.sum(~np.isnan(win_data), axis=0)
  57. valid_ratios = valid_counts / win_len
  58. win_res = np.zeros(win_data.shape[1], dtype=np.float32)
  59. valid_mask = valid_ratios >= valid_ratio
  60. if np.any(valid_mask):
  61. # 取窗口内的 95 分位数作为该窗口的代表异常分,过滤掉偶发的极端孤立噪点
  62. quantile_scores = np.nanquantile(win_data[:, valid_mask], threshold_val, axis=0)
  63. win_res[valid_mask] = quantile_scores
  64. # 限制分数在 0~1 之间
  65. chunk_results.append(np.clip(win_res, 0, 1))
  66. return chunk_results
  67. class DataAnomalyProcessor:
  68. def __init__(self):
  69. self.threshold_path = config.THRESHOLD_FILENAME
  70. self.threshold_df = self._load_thresholds()
  71. self.sensor_list = self.threshold_df['ID'].tolist()
  72. self.threshold_dict = self.threshold_df.set_index('ID').to_dict('index')
  73. # 根据CPU核心数设定并行度
  74. self.n_jobs_io = 24
  75. self.n_jobs_cpu = 64
  76. def _load_thresholds(self):
  77. """加载阈值表"""
  78. try:
  79. df = pd.read_excel(self.threshold_path)
  80. except Exception as e:
  81. raise ValueError(f"读取阈值表失败: {e}")
  82. required_cols = ['ID', 'Direction', 'Hard_min', 'Hard_max', 'Good_min', 'Good_max']
  83. for col in required_cols:
  84. if col not in df.columns:
  85. for c in df.columns:
  86. if c.lower() == col.lower():
  87. df.rename(columns={c: col}, inplace=True)
  88. break
  89. cols_to_numeric = ['Hard_min', 'Hard_max', 'Good_min', 'Good_max', 'Alarm_time']
  90. for c in cols_to_numeric:
  91. if c in df.columns:
  92. df[c] = pd.to_numeric(df[c], errors='coerce')
  93. df['Hard_min'] = df['Hard_min'].fillna(-np.inf)
  94. df['Hard_max'] = df['Hard_max'].fillna(np.inf)
  95. df['Good_min'] = df['Good_min'].fillna(-np.inf)
  96. df['Good_max'] = df['Good_max'].fillna(np.inf)
  97. df['Alarm_time'] = df['Alarm_time'].fillna(60)
  98. return df
  99. def _calculate_point_score_vectorized(self, series, sensor_name):
  100. """双重异常得分计算(绝对阈值 + 动态MAD)"""
  101. # 向量化计算逻辑
  102. if sensor_name not in self.threshold_dict:
  103. return pd.Series(0.0, index=series.index, dtype=np.float32)
  104. info = self.threshold_dict[sensor_name]
  105. vals = series.astype(np.float32).copy()
  106. # 1. 物理硬阈值过滤:超出物理极限的数据视为无效/传感器故障,直接置为 NaN
  107. mask_invalid = (vals < info['Hard_min']) | (vals > info['Hard_max'])
  108. vals[mask_invalid] = np.nan
  109. # 如果全部为 NaN,直接返回 0
  110. if vals.isna().all():
  111. return pd.Series(0.0, index=vals.index, dtype=np.float32)
  112. # ==================== (A) 计算绝对阈值得分 ====================
  113. abs_scores = pd.Series(0.0, index=vals.index, dtype=np.float32)
  114. direction = str(info['Direction']).strip().lower()
  115. # 计算偏低异常:当值低于 Good_min 但高于 Hard_min 时,采用线性插值计算异常度 (0~1)
  116. if direction in ['low', 'both']:
  117. mask_low = (vals < info['Good_min']) & (vals >= info['Hard_min'])
  118. denom = info['Good_min'] - info['Hard_min']
  119. if denom > 1e-5:
  120. abs_scores[mask_low] = (info['Good_min'] - vals[mask_low]) / denom
  121. else:
  122. abs_scores[mask_low] = 1.0
  123. # 计算偏高异常:当值高于 Good_max 但低于 Hard_max 时
  124. if direction in ['high', 'both']:
  125. mask_high = (vals > info['Good_max']) & (vals <= info['Hard_max'])
  126. denom = info['Hard_max'] - info['Good_max']
  127. if denom > 1e-5:
  128. abs_scores[mask_high] = (vals[mask_high] - info['Good_max']) / denom
  129. else:
  130. abs_scores[mask_high] = 1.0
  131. # 根据报警时间赋予时间权重,报警要求越快,异常得分放大比例越高
  132. alarm_t = max(info['Alarm_time'], 1.0)
  133. time_weight = 1.0 + (30.0 / alarm_t)
  134. abs_scores = (abs_scores * time_weight).clip(0, 1)
  135. # ==================== (B) 计算动态 MAD 得分 ====================
  136. # MAD (中位数绝对偏差) 能在不依赖人为阈值的情况下,敏锐捕捉数据的“异常突降/突增”
  137. # # 获取近期历史窗口的中位数作为“动态基线”
  138. rolling_median = vals.rolling(
  139. window=config.MAD_HISTORY_WINDOW,
  140. min_periods=1
  141. ).median()
  142. # 计算绝对偏差 |x_i - Median|
  143. abs_deviation = (vals - rolling_median).abs()
  144. # 计算 MAD = Median(|x_i - Median|)
  145. rolling_mad = abs_deviation.rolling(
  146. window=config.MAD_HISTORY_WINDOW,
  147. min_periods=1
  148. ).median()
  149. # 计算动态得分:将偏差映射到 0~1 的区间,除以 (MAD_THRESHOLD * MAD)
  150. # 加 1e-5 防止除以 0 导致溢出
  151. dyn_scores = (abs_deviation / (rolling_mad * config.MAD_THRESHOLD + 1e-5)).clip(0, 1)
  152. # ==================== (C) 综合加权得分 ====================
  153. final_scores = (config.ABSOLUTE_SCORE_WEIGHT * abs_scores) + (config.DYNAMIC_SCORE_WEIGHT * dyn_scores)
  154. # 恢复无效数据的 NaN 状态
  155. final_scores[mask_invalid] = np.nan
  156. return final_scores
  157. def process(self):
  158. """
  159. 主执行流水线:文件读取 -> 降采样 -> 异常打分 -> 窗口聚合 -> 数据集切分
  160. """
  161. print(f">>> [Step 1] 数据处理启动 | 检测到 CPU 核心数: {multiprocessing.cpu_count()}")
  162. # 1. 并行读取与降采样
  163. file_indices = list(range(config.SENSOR_FILE_NUM_RANGE[0], config.SENSOR_FILE_NUM_RANGE[1] + 1))
  164. file_paths = [os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}{i}.csv") for i in file_indices]
  165. print(f"正在并行读取 {len(file_indices)} 个文件 (并发数: {self.n_jobs_io})...")
  166. # 使用 joblib 并行执行 _process_single_file_task
  167. results = Parallel(n_jobs=self.n_jobs_io, backend="loky", verbose=5)(
  168. delayed(_process_single_file_task)(
  169. idx, path, self.sensor_list, config.TARGET_SAMPLE_INTERVAL
  170. ) for idx, path in zip(file_indices, file_paths)
  171. )
  172. # 过滤掉 None 结果
  173. all_series_list = [r for r in results if r is not None]
  174. if not all_series_list:
  175. raise ValueError("没有读取到有效数据")
  176. # 2. 合并数据
  177. print("正在纵向合并数据...")
  178. resampled_df = pd.concat(all_series_list, axis=0).sort_index()
  179. resampled_df = resampled_df[~resampled_df.index.duplicated(keep='first')]
  180. # 释放内存
  181. del all_series_list, results
  182. gc.collect()
  183. # 3. 计算点级得分
  184. print("计算点级异常得分...")
  185. scores_dict = {}
  186. for sensor in tqdm(self.sensor_list, desc="计算得分"):
  187. if sensor in resampled_df.columns:
  188. scores_dict[sensor] = self._calculate_point_score_vectorized(resampled_df[sensor], sensor)
  189. else:
  190. scores_dict[sensor] = pd.Series(np.nan, index=resampled_df.index, dtype=np.float32)
  191. point_score_df = pd.DataFrame(scores_dict)
  192. point_score_df = point_score_df[self.sensor_list] # 对齐列顺序
  193. # 4. 并行窗口聚合 (计算密集型)
  194. print(f"正在并行计算窗口分位数 (并发数: {self.n_jobs_cpu})...")
  195. values = point_score_df.values
  196. win_len = config.POINTS_PER_WINDOW
  197. step = config.WINDOW_STEP
  198. total_len = len(values)
  199. if total_len < win_len:
  200. return np.array([]), np.array([]), self.threshold_df
  201. # 生成所有窗口的起始索引
  202. all_start_indices = list(range(0, total_len - win_len, step))
  203. # 将索引切分为多个 Chunk,每个 Chunk 分配给一个 CPU 核心
  204. num_chunks = self.n_jobs_cpu * 4
  205. chunk_size = max(1, len(all_start_indices) // num_chunks)
  206. chunks = [all_start_indices[i:i + chunk_size] for i in range(0, len(all_start_indices), chunk_size)]
  207. # 并行计算
  208. chunk_results = Parallel(n_jobs=self.n_jobs_cpu, backend="loky", verbose=5)(
  209. delayed(_calculate_window_chunk)(
  210. chunk, values, win_len, config.VALID_DATA_RATIO
  211. ) for chunk in chunks
  212. )
  213. # 合并结果
  214. window_scores = [item for sublist in chunk_results for item in sublist]
  215. final_scores = np.nan_to_num(np.array(window_scores, dtype=np.float32), nan=0.0)
  216. # 5. 切分数据集
  217. total_samples = final_scores.shape[0]
  218. if total_samples == 0:
  219. raise ValueError("窗口数量为0")
  220. split_idx = int(total_samples * config.TRAIN_TEST_SPLIT)
  221. train_scores = final_scores[:split_idx]
  222. test_scores = final_scores[split_idx:]
  223. print(f"处理完成: 训练集 {len(train_scores)} | 测试集 {len(test_scores)}")
  224. return train_scores, test_scores, self.threshold_df