| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- # -*- coding: utf-8 -*-
- """
- data_processing.py: 第一层 - 异常量化表征
- 该模块负责将海量、原始、可能有缺失值的传感器时序数据,
- 转化为系统可理解的、标准化的 0~1 异常得分矩阵。
- 包含数据降采样、双重异常评估、滑动窗口聚合等核心逻辑。
- """
- import pandas as pd
- import numpy as np
- import os
- from tqdm import tqdm
- from config import config
- import gc
- from joblib import Parallel, delayed
- import multiprocessing
- def _process_single_file_task(file_idx, file_path, sensor_list, target_interval):
- """
- 单个文件的读取与降采样任务(运行在子进程中)
- """
- if not os.path.exists(file_path):
- return None
-
- try:
- # 1. 快速读取
- df_temp = pd.read_csv(file_path, index_col=None, low_memory=False)
- if df_temp.empty:
- return None
-
- # 2. 时间解析 (mixed格式)
- time_col = df_temp.columns[0]
- df_temp[time_col] = pd.to_datetime(
- df_temp[time_col],
- format='mixed',
- errors='coerce'
- )
- df_temp = df_temp.dropna(subset=[time_col]).set_index(time_col)
-
- # 3. 筛选列
- valid_cols = [c for c in df_temp.columns if c in sensor_list]
- if not valid_cols:
- return None
-
- df_valid = df_temp[valid_cols]
-
- # 4. 立即降采样 (4s -> 20s) & 类型转换
- df_resampled = df_valid.resample(f"{target_interval}s").mean()
- df_resampled = df_resampled.astype(np.float32)
-
- return df_resampled
-
- except Exception as e:
- print(f"文件 {file_idx} 处理出错: {e}")
- return None
- def _calculate_window_chunk(start_indices, values, win_len, valid_ratio, threshold_val=0.95):
- """
- 窗口计算任务块(运行在子进程中)
- 将连续的时序点打包成窗口(例如 40分钟=120个点),计算该窗口内的综合异常程度。
- """
- chunk_results = []
-
- for start in start_indices:
- win_data = values[start : start + win_len, :]
-
- # 向量化计算窗口内的数据有效性(非 NaN 数据的比例)
- valid_counts = np.sum(~np.isnan(win_data), axis=0)
- valid_ratios = valid_counts / win_len
-
- win_res = np.zeros(win_data.shape[1], dtype=np.float32)
- valid_mask = valid_ratios >= valid_ratio
-
- if np.any(valid_mask):
- # 取窗口内的 95 分位数作为该窗口的代表异常分,过滤掉偶发的极端孤立噪点
- quantile_scores = np.nanquantile(win_data[:, valid_mask], threshold_val, axis=0)
- win_res[valid_mask] = quantile_scores
-
- # 限制分数在 0~1 之间
- chunk_results.append(np.clip(win_res, 0, 1))
-
- return chunk_results
- class DataAnomalyProcessor:
- def __init__(self):
- self.threshold_path = config.THRESHOLD_FILENAME
- self.threshold_df = self._load_thresholds()
- self.sensor_list = self.threshold_df['ID'].tolist()
- self.threshold_dict = self.threshold_df.set_index('ID').to_dict('index')
-
- # 根据CPU核心数设定并行度
- self.n_jobs_io = 24
- self.n_jobs_cpu = 64
-
- def _load_thresholds(self):
- """加载阈值表"""
- try:
- df = pd.read_excel(self.threshold_path)
- except Exception as e:
- raise ValueError(f"读取阈值表失败: {e}")
- required_cols = ['ID', 'Direction', 'Hard_min', 'Hard_max', 'Good_min', 'Good_max']
- for col in required_cols:
- if col not in df.columns:
- for c in df.columns:
- if c.lower() == col.lower():
- df.rename(columns={c: col}, inplace=True)
- break
-
- cols_to_numeric = ['Hard_min', 'Hard_max', 'Good_min', 'Good_max', 'Alarm_time']
- for c in cols_to_numeric:
- if c in df.columns:
- df[c] = pd.to_numeric(df[c], errors='coerce')
-
- df['Hard_min'] = df['Hard_min'].fillna(-np.inf)
- df['Hard_max'] = df['Hard_max'].fillna(np.inf)
- df['Good_min'] = df['Good_min'].fillna(-np.inf)
- df['Good_max'] = df['Good_max'].fillna(np.inf)
- df['Alarm_time'] = df['Alarm_time'].fillna(60)
- return df
- def _calculate_point_score_vectorized(self, series, sensor_name):
- """双重异常得分计算(绝对阈值 + 动态MAD)"""
- # 向量化计算逻辑
- if sensor_name not in self.threshold_dict:
- return pd.Series(0.0, index=series.index, dtype=np.float32)
-
- info = self.threshold_dict[sensor_name]
- vals = series.astype(np.float32).copy()
-
- # 1. 物理硬阈值过滤:超出物理极限的数据视为无效/传感器故障,直接置为 NaN
- mask_invalid = (vals < info['Hard_min']) | (vals > info['Hard_max'])
- vals[mask_invalid] = np.nan
-
- # 如果全部为 NaN,直接返回 0
- if vals.isna().all():
- return pd.Series(0.0, index=vals.index, dtype=np.float32)
-
- # ==================== (A) 计算绝对阈值得分 ====================
- abs_scores = pd.Series(0.0, index=vals.index, dtype=np.float32)
- direction = str(info['Direction']).strip().lower()
-
- # 计算偏低异常:当值低于 Good_min 但高于 Hard_min 时,采用线性插值计算异常度 (0~1)
- if direction in ['low', 'both']:
- mask_low = (vals < info['Good_min']) & (vals >= info['Hard_min'])
- denom = info['Good_min'] - info['Hard_min']
- if denom > 1e-5:
- abs_scores[mask_low] = (info['Good_min'] - vals[mask_low]) / denom
- else:
- abs_scores[mask_low] = 1.0
-
- # 计算偏高异常:当值高于 Good_max 但低于 Hard_max 时
- if direction in ['high', 'both']:
- mask_high = (vals > info['Good_max']) & (vals <= info['Hard_max'])
- denom = info['Hard_max'] - info['Good_max']
- if denom > 1e-5:
- abs_scores[mask_high] = (vals[mask_high] - info['Good_max']) / denom
- else:
- abs_scores[mask_high] = 1.0
-
- # 根据报警时间赋予时间权重,报警要求越快,异常得分放大比例越高
- alarm_t = max(info['Alarm_time'], 1.0)
- time_weight = 1.0 + (30.0 / alarm_t)
- abs_scores = (abs_scores * time_weight).clip(0, 1)
-
- # ==================== (B) 计算动态 MAD 得分 ====================
- # MAD (中位数绝对偏差) 能在不依赖人为阈值的情况下,敏锐捕捉数据的“异常突降/突增”
- # # 获取近期历史窗口的中位数作为“动态基线”
- rolling_median = vals.rolling(
- window=config.MAD_HISTORY_WINDOW,
- min_periods=1
- ).median()
-
- # 计算绝对偏差 |x_i - Median|
- abs_deviation = (vals - rolling_median).abs()
-
- # 计算 MAD = Median(|x_i - Median|)
- rolling_mad = abs_deviation.rolling(
- window=config.MAD_HISTORY_WINDOW,
- min_periods=1
- ).median()
-
- # 计算动态得分:将偏差映射到 0~1 的区间,除以 (MAD_THRESHOLD * MAD)
- # 加 1e-5 防止除以 0 导致溢出
- dyn_scores = (abs_deviation / (rolling_mad * config.MAD_THRESHOLD + 1e-5)).clip(0, 1)
-
- # ==================== (C) 综合加权得分 ====================
- final_scores = (config.ABSOLUTE_SCORE_WEIGHT * abs_scores) + (config.DYNAMIC_SCORE_WEIGHT * dyn_scores)
-
- # 恢复无效数据的 NaN 状态
- final_scores[mask_invalid] = np.nan
-
- return final_scores
- def process(self):
- """
- 主执行流水线:文件读取 -> 降采样 -> 异常打分 -> 窗口聚合 -> 数据集切分
- """
- print(f">>> [Step 1] 数据处理启动 | 检测到 CPU 核心数: {multiprocessing.cpu_count()}")
-
- # 1. 并行读取与降采样
- file_indices = list(range(config.SENSOR_FILE_NUM_RANGE[0], config.SENSOR_FILE_NUM_RANGE[1] + 1))
- file_paths = [os.path.join(config.DATASET_SENSOR_DIR, f"{config.SENSOR_FILE_PREFIX}{i}.csv") for i in file_indices]
-
- print(f"正在并行读取 {len(file_indices)} 个文件 (并发数: {self.n_jobs_io})...")
-
- # 使用 joblib 并行执行 _process_single_file_task
- results = Parallel(n_jobs=self.n_jobs_io, backend="loky", verbose=5)(
- delayed(_process_single_file_task)(
- idx, path, self.sensor_list, config.TARGET_SAMPLE_INTERVAL
- ) for idx, path in zip(file_indices, file_paths)
- )
-
- # 过滤掉 None 结果
- all_series_list = [r for r in results if r is not None]
-
- if not all_series_list:
- raise ValueError("没有读取到有效数据")
- # 2. 合并数据
- print("正在纵向合并数据...")
- resampled_df = pd.concat(all_series_list, axis=0).sort_index()
- resampled_df = resampled_df[~resampled_df.index.duplicated(keep='first')]
-
- # 释放内存
- del all_series_list, results
- gc.collect()
- # 3. 计算点级得分
- print("计算点级异常得分...")
- scores_dict = {}
- for sensor in tqdm(self.sensor_list, desc="计算得分"):
- if sensor in resampled_df.columns:
- scores_dict[sensor] = self._calculate_point_score_vectorized(resampled_df[sensor], sensor)
- else:
- scores_dict[sensor] = pd.Series(np.nan, index=resampled_df.index, dtype=np.float32)
-
- point_score_df = pd.DataFrame(scores_dict)
- point_score_df = point_score_df[self.sensor_list] # 对齐列顺序
-
- # 4. 并行窗口聚合 (计算密集型)
- print(f"正在并行计算窗口分位数 (并发数: {self.n_jobs_cpu})...")
-
- values = point_score_df.values
- win_len = config.POINTS_PER_WINDOW
- step = config.WINDOW_STEP
- total_len = len(values)
-
- if total_len < win_len:
- return np.array([]), np.array([]), self.threshold_df
-
- # 生成所有窗口的起始索引
- all_start_indices = list(range(0, total_len - win_len, step))
-
- # 将索引切分为多个 Chunk,每个 Chunk 分配给一个 CPU 核心
- num_chunks = self.n_jobs_cpu * 4
- chunk_size = max(1, len(all_start_indices) // num_chunks)
- chunks = [all_start_indices[i:i + chunk_size] for i in range(0, len(all_start_indices), chunk_size)]
-
- # 并行计算
- chunk_results = Parallel(n_jobs=self.n_jobs_cpu, backend="loky", verbose=5)(
- delayed(_calculate_window_chunk)(
- chunk, values, win_len, config.VALID_DATA_RATIO
- ) for chunk in chunks
- )
-
- # 合并结果
- window_scores = [item for sublist in chunk_results for item in sublist]
-
- final_scores = np.nan_to_num(np.array(window_scores, dtype=np.float32), nan=0.0)
-
- # 5. 切分数据集
- total_samples = final_scores.shape[0]
- if total_samples == 0:
- raise ValueError("窗口数量为0")
-
- split_idx = int(total_samples * config.TRAIN_TEST_SPLIT)
- train_scores = final_scores[:split_idx]
- test_scores = final_scores[split_idx:]
-
- print(f"处理完成: 训练集 {len(train_scores)} | 测试集 {len(test_scores)}")
-
- return train_scores, test_scores, self.threshold_df
|