# -*- coding: utf-8 -*- """data_processing.py: 第一层 - 异常量化表征 """ import pandas as pd import numpy as np import os from tqdm import tqdm from config import config import gc from joblib import Parallel, delayed import multiprocessing def _process_single_file_task(file_idx, file_path, sensor_list, target_interval): """ 单个文件的读取与降采样任务(运行在子进程中) """ if not os.path.exists(file_path): return None try: # 1. 快速读取 df_temp = pd.read_csv(file_path, index_col=None, low_memory=False) if df_temp.empty: return None # 2. 时间解析 (mixed格式) time_col = df_temp.columns[0] df_temp[time_col] = pd.to_datetime( df_temp[time_col], format='mixed', errors='coerce' ) df_temp = df_temp.dropna(subset=[time_col]).set_index(time_col) # 3. 筛选列 valid_cols = [c for c in df_temp.columns if c in sensor_list] if not valid_cols: return None df_valid = df_temp[valid_cols] # 4. 立即降采样 (4s -> 20s) & 类型转换 df_resampled = df_valid.resample(f"{target_interval}s").mean() df_resampled = df_resampled.astype(np.float32) return df_resampled except Exception as e: print(f"文件 {file_idx} 处理出错: {e}") return None def _calculate_window_chunk(start_indices, values, win_len, valid_ratio, threshold_val=0.95): """ 窗口计算任务块(运行在子进程中) 处理一批窗口的 nanquantile 计算 """ chunk_results = [] for start in start_indices: win_data = values[start : start + win_len, :] # 向量化计算有效性 # axis=0 沿时间轴统计 valid_counts = np.sum(~np.isnan(win_data), axis=0) valid_ratios = valid_counts / win_len win_res = np.zeros(win_data.shape[1], dtype=np.float32) valid_mask = valid_ratios >= valid_ratio if np.any(valid_mask): # 并行化 quantile_scores = np.nanquantile(win_data[:, valid_mask], threshold_val, axis=0) win_res[valid_mask] = quantile_scores chunk_results.append(np.clip(win_res, 0, 1)) return chunk_results class DataAnomalyProcessor: def __init__(self): self.threshold_path = os.path.join(config.BASE_DIR, config.THRESHOLD_FILENAME) self.threshold_df = self._load_thresholds() self.sensor_list = self.threshold_df['ID'].tolist() self.threshold_dict = self.threshold_df.set_index('ID').to_dict('index') # 根据CPU核心数设定并行度 self.n_jobs_io = 24 self.n_jobs_cpu = 64 def _load_thresholds(self): """加载阈值表""" try: df = pd.read_excel(self.threshold_path) except Exception as e: raise ValueError(f"读取阈值表失败: {e}") required_cols = ['ID', 'Direction', 'Hard_min', 'Hard_max', 'Good_min', 'Good_max'] for col in required_cols: if col not in df.columns: for c in df.columns: if c.lower() == col.lower(): df.rename(columns={c: col}, inplace=True) break cols_to_numeric = ['Hard_min', 'Hard_max', 'Good_min', 'Good_max', 'Alarm_time'] for c in cols_to_numeric: if c in df.columns: df[c] = pd.to_numeric(df[c], errors='coerce') df['Hard_min'] = df['Hard_min'].fillna(-np.inf) df['Hard_max'] = df['Hard_max'].fillna(np.inf) df['Good_min'] = df['Good_min'].fillna(-np.inf) df['Good_max'] = df['Good_max'].fillna(np.inf) df['Alarm_time'] = df['Alarm_time'].fillna(60) return df def _calculate_point_score_vectorized(self, series, sensor_name): # 向量化计算逻辑 if sensor_name not in self.threshold_dict: return pd.Series(0.0, index=series.index, dtype=np.float32) info = self.threshold_dict[sensor_name] vals = series.astype(np.float32).copy() # 1. 硬阈值掩码(在硬阈值外的数据视为无效/缺失) mask_invalid = (vals < info['Hard_min']) | (vals > info['Hard_max']) vals[mask_invalid] = np.nan # 如果全部为 NaN,直接返回 0 if vals.isna().all(): return pd.Series(0.0, index=vals.index, dtype=np.float32) # ==================== (A) 计算绝对阈值得分 ==================== abs_scores = pd.Series(0.0, index=vals.index, dtype=np.float32) direction = str(info['Direction']).strip().lower() if direction in ['low', 'both']: mask_low = (vals < info['Good_min']) & (vals >= info['Hard_min']) denom = info['Good_min'] - info['Hard_min'] if denom > 1e-5: abs_scores[mask_low] = (info['Good_min'] - vals[mask_low]) / denom else: abs_scores[mask_low] = 1.0 if direction in ['high', 'both']: mask_high = (vals > info['Good_max']) & (vals <= info['Hard_max']) denom = info['Hard_max'] - info['Good_max'] if denom > 1e-5: abs_scores[mask_high] = (vals[mask_high] - info['Good_max']) / denom else: abs_scores[mask_high] = 1.0 alarm_t = max(info['Alarm_time'], 1.0) time_weight = 1.0 + (30.0 / alarm_t) abs_scores = (abs_scores * time_weight).clip(0, 1) # ==================== (B) 计算动态 MAD 得分 ==================== # 使用 rolling 计算滚动窗口内的中位数 (Median) rolling_median = vals.rolling( window=config.MAD_HISTORY_WINDOW, min_periods=1 ).median() # 计算绝对偏差 |x_i - Median| abs_deviation = (vals - rolling_median).abs() # 计算 MAD = Median(|x_i - Median|) rolling_mad = abs_deviation.rolling( window=config.MAD_HISTORY_WINDOW, min_periods=1 ).median() # 计算动态得分:将偏差映射到 0~1 的区间,除以 (MAD_THRESHOLD * MAD) # 加 1e-5 防止除以 0 导致溢出 dyn_scores = (abs_deviation / (rolling_mad * config.MAD_THRESHOLD + 1e-5)).clip(0, 1) # ==================== (C) 综合加权得分 ==================== final_scores = (config.ABSOLUTE_SCORE_WEIGHT * abs_scores) + (config.DYNAMIC_SCORE_WEIGHT * dyn_scores) # 恢复无效数据的 NaN 状态 final_scores[mask_invalid] = np.nan return final_scores def process(self): print(f">>> [Step 1] 数据处理启动 | 检测到 CPU 核心数: {multiprocessing.cpu_count()}") # 1. 并行读取与降采样 file_indices = list(range(config.SENSOR_FILE_NUM_RANGE[0], config.SENSOR_FILE_NUM_RANGE[1] + 1)) file_paths = [os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}{i}.csv") for i in file_indices] print(f"正在并行读取 {len(file_indices)} 个文件 (并发数: {self.n_jobs_io})...") # 使用 joblib 并行执行 _process_single_file_task results = Parallel(n_jobs=self.n_jobs_io, backend="loky", verbose=5)( delayed(_process_single_file_task)( idx, path, self.sensor_list, config.TARGET_SAMPLE_INTERVAL ) for idx, path in zip(file_indices, file_paths) ) # 过滤掉 None 结果 all_series_list = [r for r in results if r is not None] if not all_series_list: raise ValueError("没有读取到有效数据") # 2. 合并数据 print("正在纵向合并数据...") resampled_df = pd.concat(all_series_list, axis=0).sort_index() resampled_df = resampled_df[~resampled_df.index.duplicated(keep='first')] # 释放内存 del all_series_list, results gc.collect() # 3. 计算点级得分 print("计算点级异常得分...") scores_dict = {} for sensor in tqdm(self.sensor_list, desc="计算得分"): if sensor in resampled_df.columns: scores_dict[sensor] = self._calculate_point_score_vectorized(resampled_df[sensor], sensor) else: scores_dict[sensor] = pd.Series(np.nan, index=resampled_df.index, dtype=np.float32) point_score_df = pd.DataFrame(scores_dict) point_score_df = point_score_df[self.sensor_list] # 对齐列顺序 # 4. 并行窗口聚合 (计算密集型) print(f"正在并行计算窗口分位数 (并发数: {self.n_jobs_cpu})...") values = point_score_df.values win_len = config.POINTS_PER_WINDOW step = config.WINDOW_STEP total_len = len(values) if total_len < win_len: return np.array([]), np.array([]), self.threshold_df # 生成所有窗口的起始索引 all_start_indices = list(range(0, total_len - win_len, step)) # 将索引切分为多个 Chunk,每个 Chunk 分配给一个 CPU 核心 num_chunks = self.n_jobs_cpu * 4 chunk_size = max(1, len(all_start_indices) // num_chunks) chunks = [all_start_indices[i:i + chunk_size] for i in range(0, len(all_start_indices), chunk_size)] # 并行计算 chunk_results = Parallel(n_jobs=self.n_jobs_cpu, backend="loky", verbose=5)( delayed(_calculate_window_chunk)( chunk, values, win_len, config.VALID_DATA_RATIO ) for chunk in chunks ) # 合并结果 window_scores = [item for sublist in chunk_results for item in sublist] final_scores = np.nan_to_num(np.array(window_scores, dtype=np.float32), nan=0.0) # 5. 切分数据集 total_samples = final_scores.shape[0] if total_samples == 0: raise ValueError("窗口数量为0") split_idx = int(total_samples * config.TRAIN_TEST_SPLIT) train_scores = final_scores[:split_idx] test_scores = final_scores[split_idx:] print(f"处理完成: 训练集 {len(train_scores)} | 测试集 {len(test_scores)}") return train_scores, test_scores, self.threshold_df