| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472 |
- import pandas as pd
- import os
- from datetime import datetime, timedelta
- import numpy as np
- from functools import reduce
- # =========================
- # 全局数据路径(统一入口)
- # =========================
- DATA_PATH = "../datasets/high_freq_cip"
- # =========================
- # 全局数据格式
- # =========================
- UNIT_MAP = {
- 1: "RO1",
- 2: "RO2",
- 3: "RO3",
- 4: "RO4"
- }
- # =========================
- # 方法1:基于控制逻辑(强规则)
- # =========================
- def read_raw_signal(file_path, col_name):
- """
- 读取单变量CSV(列名就是变量名)
- 输入:
- file_path: CSV路径
- col_name: 重命名后的标准变量名(model / qxb1 等)
- """
- df = pd.read_csv(file_path)
- df['time'] = pd.to_datetime(df['time'])
- # 找到除 time 之外的唯一数据列
- value_cols = [c for c in df.columns if c != 'time']
- if len(value_cols) != 1:
- raise ValueError(f"❌ 文件列结构异常(应只有1个数据列): {file_path} | 实际列: {value_cols}")
- value_col = value_cols[0]
- # 重命名为统一字段(非常关键)
- df = df[['time', value_col]].rename(columns={value_col: col_name})
- return df
- def read_method1_signals(unit_id):
- """
- 读取方法1所需的所有控制逻辑信号
- 包括:
- - model_word(运行模式)
- - CIP泵状态(QXB1/2)
- - 阀门反馈(JSF1/2)
- 注意:
- - 每个信号是独立CSV(非连续记录)
- - 可能存在缺失文件(用exists保护)
- """
- base_path = DATA_PATH
- files = {
- "model": f"{base_path}/C_M_RO{unit_id}_DB_model_word_raw.csv",
- "qxb1": f"{base_path}/C_M_CIP_QXB1_run_raw.csv",
- "qxb2": f"{base_path}/C_M_CIP_QXB2_run_raw.csv",
- "jsf1": f"{base_path}/C_M_RO{unit_id}_CIP_JSF1_open_feedback_raw.csv",
- "jsf2": f"{base_path}/C_M_RO{unit_id}_CIP_JSF2_open_feedback_raw.csv",
- }
- dfs = []
- for key, path in files.items():
- if os.path.exists(path):
- dfs.append(read_raw_signal(path, key))
- return dfs
- def merge_signals(dfs):
- """
- 多信号时间对齐 + 前向填充
- 原理:
- - outer join 合并时间轴
- - ffill 将“状态保持”转为连续时间序列
- 关键假设:
- 控制信号在未更新时保持不变(工业数据常见)
- """
- df_merged = reduce(
- lambda left, right: pd.merge(left, right, on='time', how='outer'),
- dfs
- )
- df_merged = df_merged.sort_values('time')
- df_merged = df_merged.ffill()
- return df_merged
- def get_method1_dates(unit_id):
- """
- 方法1核心:
- 基于控制逻辑判断某天是否发生CIP
- 判定条件(逐时刻):
- model == 0
- AND (QXB1==1 OR QXB2==1)
- AND (JSF1==1 OR JSF2==1)
- 日级别判定:
- 当天只要出现一次 → 即判定为CIP日
- 输出:
- set(date)
- """
- print(f"方法1处理 {unit_id} 号机组...")
- dfs = read_method1_signals(unit_id)
- if not dfs:
- return set()
- df = merge_signals(dfs)
- df['cip_flag_m1'] = (
- (df['model'] == 0) &
- ((df['qxb1'] == 1) | (df['qxb2'] == 1)) &
- ((df['jsf1'] == 1) | (df['jsf2'] == 1))
- )
- df['date'] = df['time'].dt.date
- # 任意时刻满足 → 当天为CIP
- daily_flag = df.groupby('date')['cip_flag_m1'].any()
- return set(daily_flag[daily_flag].index)
- # =========================
- # 方法2:基于运行行为(弱规则)
- # =========================
- def read_control_word_data(unit_id):
- """
- 读取控制字(高频离散)
- 用于检测“异常运行段”
- """
- file_path = f'{DATA_PATH}/C_M_RO{unit_id}_DB_control_word_raw.csv'
- df = pd.read_csv(file_path)
- df['time'] = pd.to_datetime(df['time'])
- df = df.sort_values('time')
- df['date'] = df['time'].dt.date
- return df
- def read_pressure_data():
- """
- 读取1分钟级压力数据
- 用于识别低压运行(CIP特征)
- """
- file_path = f'{DATA_PATH}/cip_sensors_1min_merged.csv'
- df = pd.read_csv(file_path)
- df['time'] = pd.to_datetime(df['time'])
- df = df.sort_values('time')
- df['date'] = df['time'].dt.date
- return df
- def find_control_word_events(df, unit_id):
- """
- 检测“控制字异常段”
- 判定逻辑:
- - 控制字 != 24(非正常运行)
- - 持续时间 > 3h
- 输出:
- result_days:存在异常的日期
- control_events_by_date:详细时间段
- """
- result_days = []
- control_events_by_date = {}
- dates = sorted(df['date'].unique())
- prev_day_last_control_word = None
- for i, current_date in enumerate(dates):
- day_data = df[df['date'] == current_date].copy()
- day_data = day_data.sort_values('time').reset_index(drop=True)
- day_events = []
- # ===== 处理跨天初始段 =====
- first_record_time = day_data.iloc[0]['time']
- first_record_control_word = day_data.iloc[0][f'C.M.RO{unit_id}_DB@control_word']
- if i == 0:
- first_segment_control_word = 0
- else:
- first_segment_control_word = prev_day_last_control_word
- day_start = datetime.combine(current_date, datetime.min.time())
- first_segment_duration = (first_record_time - day_start).total_seconds() / 60
- if (first_segment_duration > 180 and
- first_segment_control_word != 24.0 and
- first_segment_control_word is not None):
- day_events.append({
- 'start_time': day_start,
- 'end_time': first_record_time,
- 'duration_minutes': first_segment_duration,
- 'control_word': first_segment_control_word,
- 'is_initial_segment': True
- })
- # ===== 中间段 =====
- current_control_word = first_record_control_word
- current_start_time = first_record_time
- for j in range(1, len(day_data)):
- current_time = day_data.iloc[j]['time']
- current_control = day_data.iloc[j][f'C.M.RO{unit_id}_DB@control_word']
- if current_control != current_control_word:
- duration = (current_time - current_start_time).total_seconds() / 60
- if duration > 180 and current_control_word != 24.0:
- day_events.append({
- 'start_time': current_start_time,
- 'end_time': current_time,
- 'duration_minutes': duration,
- 'control_word': current_control_word,
- 'is_initial_segment': False
- })
- current_control_word = current_control
- current_start_time = current_time
- # ===== 结束段 =====
- last_record_time = day_data.iloc[-1]['time']
- last_control_word = day_data.iloc[-1][f'C.M.RO{unit_id}_DB@control_word']
- day_end = datetime.combine(current_date, datetime.max.time()).replace(hour=23, minute=59, second=59)
- last_segment_duration = (day_end - last_record_time).total_seconds() / 60
- if (last_segment_duration > 180 and last_control_word != 24.0):
- day_events.append({
- 'start_time': last_record_time,
- 'end_time': day_end,
- 'duration_minutes': last_segment_duration,
- 'control_word': last_control_word,
- 'is_initial_segment': False
- })
- prev_day_last_control_word = last_control_word
- if day_events:
- result_days.append(current_date)
- control_events_by_date[current_date] = day_events
- return result_days, control_events_by_date
- def find_pressure_events(pressure_df, target_dates, unit_id):
- """
- 检测低压事件(CIP特征)
- 判定:
- - 压力在 [0.05, 0.5]
- - 连续 >=20min
- - 允许5分钟断点
- 输出:
- 每天的低压事件
- """
- pressure_events = {}
- for date in target_dates:
- day_data = pressure_df[pressure_df['date'] == date].copy()
- if len(day_data) == 0:
- continue
- day_data = day_data.sort_values('time')
- day_data['pressure_valid'] = day_data[f'C.M.RO{unit_id}_PT_JS@out'].between(0.05, 0.5)
- events = []
- current_streak = 0
- gap_count = 0
- event_start = None
- event_end = None
- for i, row in day_data.iterrows():
- if row['pressure_valid']:
- if current_streak == 0:
- event_start = row['time']
- current_streak += 1
- gap_count = 0
- event_end = row['time']
- else:
- if current_streak > 0:
- gap_count += 1
- if gap_count > 5:
- duration = (event_end - event_start).total_seconds() / 60 + 1
- if duration >= 20:
- events.append({
- 'start_time': event_start,
- 'end_time': event_end,
- 'duration_minutes': duration,
- 'avg_pressure': day_data.loc[
- (day_data['time'] >= event_start) &
- (day_data['time'] <= event_end) &
- day_data['pressure_valid']
- ][f'C.M.RO{unit_id}_PT_JS@out'].mean()
- })
- current_streak = 0
- gap_count = 0
- # 处理最后一段
- if current_streak > 0:
- duration = (event_end - event_start).total_seconds() / 60 + 1
- if duration >= 20:
- events.append({
- 'start_time': event_start,
- 'end_time': event_end,
- 'duration_minutes': duration,
- 'avg_pressure': day_data.loc[
- (day_data['time'] >= event_start) &
- (day_data['time'] <= event_end) &
- day_data['pressure_valid']
- ][f'C.M.RO{unit_id}_PT_JS@out'].mean()
- })
- if events:
- pressure_events[date] = events
- return pressure_events
- def match_events(control_events, pressure_events):
- """
- 控制异常 × 低压异常 → 时间重叠
- 只有同时满足 → 才认为是“CIP行为”
- """
- matched_results = []
- for control_event in control_events:
- for pressure_event in pressure_events:
- overlap_start = max(control_event['start_time'], pressure_event['start_time'])
- overlap_end = min(control_event['end_time'], pressure_event['end_time'])
- if overlap_start < overlap_end:
- overlap_duration = (overlap_end - overlap_start).total_seconds() / 60
- if overlap_duration > 0:
- matched_results.append({
- 'overlap_start': overlap_start,
- 'overlap_end': overlap_end,
- 'overlap_duration_minutes': overlap_duration
- })
- return matched_results
- def get_method2_dates(unit_id):
- """
- 方法2最终输出:
- 返回满足“控制异常 + 压力异常重叠”的日期集合
- """
- print(f"方法2处理 {unit_id} 号机组...")
- control_df = read_control_word_data(unit_id)
- target_dates, control_events_by_date = find_control_word_events(control_df, unit_id)
- pressure_df = read_pressure_data()
- pressure_events_by_date = find_pressure_events(pressure_df, target_dates, unit_id)
- both_exist_dates = [d for d in target_dates if d in pressure_events_by_date]
- matched_dates = set()
- for date in both_exist_dates:
- control_events = control_events_by_date.get(date, [])
- pressure_events = pressure_events_by_date.get(date, [])
- matched_events = match_events(control_events, pressure_events)
- if matched_events:
- matched_dates.add(date)
- return matched_dates
- # =========================
- # 融合层
- # =========================
- def label_dates(method1_dates, method2_dates, unit_id):
- """
- 融合标注规则:
- 0 → 两种方法都检测到(高置信)
- 1 → 仅方法1
- 2 → 仅方法2
- """
- all_dates = sorted(method1_dates | method2_dates)
- results = []
- for d in all_dates:
- if d in method1_dates and d in method2_dates:
- label = 0
- elif d in method1_dates:
- label = 1
- else:
- label = 2
- results.append({
- "date": d,
- "label": label,
- "unit_id": unit_id
- })
- return pd.DataFrame(results)
- def analyze_unit(unit_id):
- """
- 单机组完整流程:
- 方法1 → 日期集合
- 方法2 → 日期集合
- 融合 → 标签
- """
- unit_name = UNIT_MAP[unit_id]
- print(f"\n处理 {unit_name} 号机组...")
- method1_dates = get_method1_dates(unit_id)
- method2_dates = get_method2_dates(unit_id)
- label_df = label_dates(method1_dates, method2_dates, unit_name)
- return label_df
- def main():
- all_results = []
- for unit_id in range(1, 5):
- df = analyze_unit(unit_id)
- all_results.append(df)
- final_df = pd.concat(all_results, ignore_index=True)
- final_df["unit_id"] = final_df["unit_id"].astype(str)
- final_df = final_df.sort_values(["unit_id", "date"])
- save_path = "../use_data/cip_day_labels_all_units.csv"
- final_df.to_csv(save_path, index=False)
- print(f"\n按天CIP标签已保存到 {save_path}")
- if __name__ == "__main__":
- main()
|