import pandas as pd import os from datetime import datetime, timedelta import numpy as np from functools import reduce # ========================= # 全局数据路径(统一入口) # ========================= DATA_PATH = "../datasets/high_freq_cip" # ========================= # 全局数据格式 # ========================= UNIT_MAP = { 1: "RO1", 2: "RO2", 3: "RO3", 4: "RO4" } # ========================= # 方法1:基于控制逻辑(强规则) # ========================= def read_raw_signal(file_path, col_name): """ 读取单变量CSV(列名就是变量名) 输入: file_path: CSV路径 col_name: 重命名后的标准变量名(model / qxb1 等) """ df = pd.read_csv(file_path) df['time'] = pd.to_datetime(df['time']) # 找到除 time 之外的唯一数据列 value_cols = [c for c in df.columns if c != 'time'] if len(value_cols) != 1: raise ValueError(f"❌ 文件列结构异常(应只有1个数据列): {file_path} | 实际列: {value_cols}") value_col = value_cols[0] # 重命名为统一字段(非常关键) df = df[['time', value_col]].rename(columns={value_col: col_name}) return df def read_method1_signals(unit_id): """ 读取方法1所需的所有控制逻辑信号 包括: - model_word(运行模式) - CIP泵状态(QXB1/2) - 阀门反馈(JSF1/2) 注意: - 每个信号是独立CSV(非连续记录) - 可能存在缺失文件(用exists保护) """ base_path = DATA_PATH files = { "model": f"{base_path}/C_M_RO{unit_id}_DB_model_word_raw.csv", "qxb1": f"{base_path}/C_M_CIP_QXB1_run_raw.csv", "qxb2": f"{base_path}/C_M_CIP_QXB2_run_raw.csv", "jsf1": f"{base_path}/C_M_RO{unit_id}_CIP_JSF1_open_feedback_raw.csv", "jsf2": f"{base_path}/C_M_RO{unit_id}_CIP_JSF2_open_feedback_raw.csv", } dfs = [] for key, path in files.items(): if os.path.exists(path): dfs.append(read_raw_signal(path, key)) return dfs def merge_signals(dfs): """ 多信号时间对齐 + 前向填充 原理: - outer join 合并时间轴 - ffill 将“状态保持”转为连续时间序列 关键假设: 控制信号在未更新时保持不变(工业数据常见) """ df_merged = reduce( lambda left, right: pd.merge(left, right, on='time', how='outer'), dfs ) df_merged = df_merged.sort_values('time') df_merged = df_merged.ffill() return df_merged def get_method1_dates(unit_id): """ 方法1核心: 基于控制逻辑判断某天是否发生CIP 判定条件(逐时刻): model == 0 AND (QXB1==1 OR QXB2==1) AND (JSF1==1 OR JSF2==1) 日级别判定: 当天只要出现一次 → 即判定为CIP日 输出: set(date) """ print(f"方法1处理 {unit_id} 号机组...") dfs = read_method1_signals(unit_id) if not dfs: return set() df = merge_signals(dfs) df['cip_flag_m1'] = ( (df['model'] == 0) & ((df['qxb1'] == 1) | (df['qxb2'] == 1)) & ((df['jsf1'] == 1) | (df['jsf2'] == 1)) ) df['date'] = df['time'].dt.date # 任意时刻满足 → 当天为CIP daily_flag = df.groupby('date')['cip_flag_m1'].any() return set(daily_flag[daily_flag].index) # ========================= # 方法2:基于运行行为(弱规则) # ========================= def read_control_word_data(unit_id): """ 读取控制字(高频离散) 用于检测“异常运行段” """ file_path = f'{DATA_PATH}/C_M_RO{unit_id}_DB_control_word_raw.csv' df = pd.read_csv(file_path) df['time'] = pd.to_datetime(df['time']) df = df.sort_values('time') df['date'] = df['time'].dt.date return df def read_pressure_data(): """ 读取1分钟级压力数据 用于识别低压运行(CIP特征) """ file_path = f'{DATA_PATH}/cip_sensors_1min_merged.csv' df = pd.read_csv(file_path) df['time'] = pd.to_datetime(df['time']) df = df.sort_values('time') df['date'] = df['time'].dt.date return df def find_control_word_events(df, unit_id): """ 检测“控制字异常段” 判定逻辑: - 控制字 != 24(非正常运行) - 持续时间 > 3h 输出: result_days:存在异常的日期 control_events_by_date:详细时间段 """ result_days = [] control_events_by_date = {} dates = sorted(df['date'].unique()) prev_day_last_control_word = None for i, current_date in enumerate(dates): day_data = df[df['date'] == current_date].copy() day_data = day_data.sort_values('time').reset_index(drop=True) day_events = [] # ===== 处理跨天初始段 ===== first_record_time = day_data.iloc[0]['time'] first_record_control_word = day_data.iloc[0][f'C.M.RO{unit_id}_DB@control_word'] if i == 0: first_segment_control_word = 0 else: first_segment_control_word = prev_day_last_control_word day_start = datetime.combine(current_date, datetime.min.time()) first_segment_duration = (first_record_time - day_start).total_seconds() / 60 if (first_segment_duration > 180 and first_segment_control_word != 24.0 and first_segment_control_word is not None): day_events.append({ 'start_time': day_start, 'end_time': first_record_time, 'duration_minutes': first_segment_duration, 'control_word': first_segment_control_word, 'is_initial_segment': True }) # ===== 中间段 ===== current_control_word = first_record_control_word current_start_time = first_record_time for j in range(1, len(day_data)): current_time = day_data.iloc[j]['time'] current_control = day_data.iloc[j][f'C.M.RO{unit_id}_DB@control_word'] if current_control != current_control_word: duration = (current_time - current_start_time).total_seconds() / 60 if duration > 180 and current_control_word != 24.0: day_events.append({ 'start_time': current_start_time, 'end_time': current_time, 'duration_minutes': duration, 'control_word': current_control_word, 'is_initial_segment': False }) current_control_word = current_control current_start_time = current_time # ===== 结束段 ===== last_record_time = day_data.iloc[-1]['time'] last_control_word = day_data.iloc[-1][f'C.M.RO{unit_id}_DB@control_word'] day_end = datetime.combine(current_date, datetime.max.time()).replace(hour=23, minute=59, second=59) last_segment_duration = (day_end - last_record_time).total_seconds() / 60 if (last_segment_duration > 180 and last_control_word != 24.0): day_events.append({ 'start_time': last_record_time, 'end_time': day_end, 'duration_minutes': last_segment_duration, 'control_word': last_control_word, 'is_initial_segment': False }) prev_day_last_control_word = last_control_word if day_events: result_days.append(current_date) control_events_by_date[current_date] = day_events return result_days, control_events_by_date def find_pressure_events(pressure_df, target_dates, unit_id): """ 检测低压事件(CIP特征) 判定: - 压力在 [0.05, 0.5] - 连续 >=20min - 允许5分钟断点 输出: 每天的低压事件 """ pressure_events = {} for date in target_dates: day_data = pressure_df[pressure_df['date'] == date].copy() if len(day_data) == 0: continue day_data = day_data.sort_values('time') day_data['pressure_valid'] = day_data[f'C.M.RO{unit_id}_PT_JS@out'].between(0.05, 0.5) events = [] current_streak = 0 gap_count = 0 event_start = None event_end = None for i, row in day_data.iterrows(): if row['pressure_valid']: if current_streak == 0: event_start = row['time'] current_streak += 1 gap_count = 0 event_end = row['time'] else: if current_streak > 0: gap_count += 1 if gap_count > 5: duration = (event_end - event_start).total_seconds() / 60 + 1 if duration >= 20: events.append({ 'start_time': event_start, 'end_time': event_end, 'duration_minutes': duration, 'avg_pressure': day_data.loc[ (day_data['time'] >= event_start) & (day_data['time'] <= event_end) & day_data['pressure_valid'] ][f'C.M.RO{unit_id}_PT_JS@out'].mean() }) current_streak = 0 gap_count = 0 # 处理最后一段 if current_streak > 0: duration = (event_end - event_start).total_seconds() / 60 + 1 if duration >= 20: events.append({ 'start_time': event_start, 'end_time': event_end, 'duration_minutes': duration, 'avg_pressure': day_data.loc[ (day_data['time'] >= event_start) & (day_data['time'] <= event_end) & day_data['pressure_valid'] ][f'C.M.RO{unit_id}_PT_JS@out'].mean() }) if events: pressure_events[date] = events return pressure_events def match_events(control_events, pressure_events): """ 控制异常 × 低压异常 → 时间重叠 只有同时满足 → 才认为是“CIP行为” """ matched_results = [] for control_event in control_events: for pressure_event in pressure_events: overlap_start = max(control_event['start_time'], pressure_event['start_time']) overlap_end = min(control_event['end_time'], pressure_event['end_time']) if overlap_start < overlap_end: overlap_duration = (overlap_end - overlap_start).total_seconds() / 60 if overlap_duration > 0: matched_results.append({ 'overlap_start': overlap_start, 'overlap_end': overlap_end, 'overlap_duration_minutes': overlap_duration }) return matched_results def get_method2_dates(unit_id): """ 方法2最终输出: 返回满足“控制异常 + 压力异常重叠”的日期集合 """ print(f"方法2处理 {unit_id} 号机组...") control_df = read_control_word_data(unit_id) target_dates, control_events_by_date = find_control_word_events(control_df, unit_id) pressure_df = read_pressure_data() pressure_events_by_date = find_pressure_events(pressure_df, target_dates, unit_id) both_exist_dates = [d for d in target_dates if d in pressure_events_by_date] matched_dates = set() for date in both_exist_dates: control_events = control_events_by_date.get(date, []) pressure_events = pressure_events_by_date.get(date, []) matched_events = match_events(control_events, pressure_events) if matched_events: matched_dates.add(date) return matched_dates # ========================= # 融合层 # ========================= def label_dates(method1_dates, method2_dates, unit_id): """ 融合标注规则: 0 → 两种方法都检测到(高置信) 1 → 仅方法1 2 → 仅方法2 """ all_dates = sorted(method1_dates | method2_dates) results = [] for d in all_dates: if d in method1_dates and d in method2_dates: label = 0 elif d in method1_dates: label = 1 else: label = 2 results.append({ "date": d, "label": label, "unit_id": unit_id }) return pd.DataFrame(results) def analyze_unit(unit_id): """ 单机组完整流程: 方法1 → 日期集合 方法2 → 日期集合 融合 → 标签 """ unit_name = UNIT_MAP[unit_id] print(f"\n处理 {unit_name} 号机组...") method1_dates = get_method1_dates(unit_id) method2_dates = get_method2_dates(unit_id) label_df = label_dates(method1_dates, method2_dates, unit_name) return label_df def main(): all_results = [] for unit_id in range(1, 5): df = analyze_unit(unit_id) all_results.append(df) final_df = pd.concat(all_results, ignore_index=True) final_df["unit_id"] = final_df["unit_id"].astype(str) final_df = final_df.sort_values(["unit_id", "date"]) save_path = "../use_data/cip_day_labels_all_units.csv" final_df.to_csv(save_path, index=False) print(f"\n按天CIP标签已保存到 {save_path}") if __name__ == "__main__": main()