import pandas as pd import os from datetime import datetime, timedelta import numpy as np def read_control_word_data(unit_id): """读取控制字数据""" file_path = f'datasets/RO_data_logs/high_freq_cip/C_M_RO{unit_id}_DB_control_word_raw.csv' df = pd.read_csv(file_path) df['time'] = pd.to_datetime(df['time']) df = df.sort_values('time') df['date'] = df['time'].dt.date return df def read_pressure_data(): """读取进水压力数据""" file_path = 'datasets/RO_data_logs/high_freq_cip/cip_sensors_1min_merged.csv' df = pd.read_csv(file_path) df['time'] = pd.to_datetime(df['time']) df = df.sort_values('time') df['date'] = df['time'].dt.date return df def find_control_word_events(df, unit_id): """查找持续时间超过 3h且控制字不为24.0的控制字异常事件""" result_days = [] control_events_by_date = {} # 按日期分组 dates = sorted(df['date'].unique()) # 存储前一天的最后控制字 prev_day_last_control_word = None for i, current_date in enumerate(dates): # 获取当前日期的数据 day_data = df[df['date'] == current_date].copy() day_data = day_data.sort_values('time').reset_index(drop=True) day_events = [] # 处理当前日期的第一个事件段(00:00:00 - 第一个记录时间) first_record_time = day_data.iloc[0]['time'] first_record_control_word = day_data.iloc[0][f'C.M.RO{unit_id}_DB@control_word'] # 确定第一个事件段的控制字 if i == 0: # 第一天,没有前一天 first_segment_control_word = 0 # 按照指导,第一天初始为停机状态 else: first_segment_control_word = prev_day_last_control_word # 计算第一个事件段的持续时间(00:00:00 到 第一个记录时间) day_start = datetime.combine(current_date, datetime.min.time()) first_segment_duration = (first_record_time - day_start).total_seconds() / 60 # 如果第一个事件段持续时间超过 3h且控制字不为24.0,则记录 if (first_segment_duration > 180 and first_segment_control_word != 24.0 and first_segment_control_word is not None): day_events.append({ 'start_time': day_start, 'end_time': first_record_time, 'duration_minutes': first_segment_duration, 'control_word': first_segment_control_word, 'is_initial_segment': True }) # 处理当前日期内的其他事件段 current_control_word = first_record_control_word current_start_time = first_record_time for j in range(1, len(day_data)): current_time = day_data.iloc[j]['time'] current_control = day_data.iloc[j][f'C.M.RO{unit_id}_DB@control_word'] if current_control != current_control_word: # 状态变化,记录前一个事件段 duration = (current_time - current_start_time).total_seconds() / 60 if duration > 180 and current_control_word != 24.0: day_events.append({ 'start_time': current_start_time, 'end_time': current_time, 'duration_minutes': duration, 'control_word': current_control_word, 'is_initial_segment': False }) current_control_word = current_control current_start_time = current_time # 处理最后一个事件段(最后一个记录时间到23:59:59) last_record_time = day_data.iloc[-1]['time'] last_control_word = day_data.iloc[-1][f'C.M.RO{unit_id}_DB@control_word'] day_end = datetime.combine(current_date, datetime.max.time()).replace(hour=23, minute=59, second=59) last_segment_duration = (day_end - last_record_time).total_seconds() / 60 if (last_segment_duration > 180 and last_control_word != 24.0): day_events.append({ 'start_time': last_record_time, 'end_time': day_end, 'duration_minutes': last_segment_duration, 'control_word': last_control_word, 'is_initial_segment': False }) # 保存前一天的最后控制字 prev_day_last_control_word = last_control_word # 如果有任何异常事件,添加到结果中 if day_events: result_days.append(current_date) control_events_by_date[current_date] = day_events return result_days, control_events_by_date def find_pressure_events(pressure_df, target_dates, unit_id): """查找进水压力异常事件""" pressure_events = {} for date in target_dates: # 筛选指定日期的数据 day_data = pressure_df[pressure_df['date'] == date].copy() if len(day_data) == 0: continue day_data = day_data.sort_values('time') day_data['pressure_valid'] = day_data[f'C.M.RO{unit_id}_PT_JS@out'].between(0.05, 0.5) events = [] current_streak = 0 gap_count = 0 event_start = None event_end = None for i, row in day_data.iterrows(): if row['pressure_valid']: if current_streak == 0: event_start = row['time'] current_streak += 1 gap_count = 0 # 重置间断计数 event_end = row['time'] else: if current_streak > 0: gap_count += 1 if gap_count > 5: # 超过5分钟间断,结束当前事件 duration = (event_end - event_start).total_seconds() / 60 + 1 # 加上最后1分钟 if duration >= 20: events.append({ 'start_time': event_start, 'end_time': event_end, 'duration_minutes': duration, 'avg_pressure': day_data.loc[ (day_data['time'] >= event_start) & (day_data['time'] <= event_end) & day_data['pressure_valid'] ][f'C.M.RO{unit_id}_PT_JS@out'].mean() }) current_streak = 0 gap_count = 0 # 检查最后一段 if current_streak > 0: duration = (event_end - event_start).total_seconds() / 60 + 1 if duration >= 20: events.append({ 'start_time': event_start, 'end_time': event_end, 'duration_minutes': duration, 'avg_pressure': day_data.loc[ (day_data['time'] >= event_start) & (day_data['time'] <= event_end) & day_data['pressure_valid'] ][f'C.M.RO{unit_id}_PT_JS@out'].mean() }) if events: pressure_events[date] = events return pressure_events def match_events(control_events, pressure_events): """匹配控制字异常和进水压力异常的时间段""" matched_results = [] for control_event in control_events: for pressure_event in pressure_events: # 计算时间段的交集 overlap_start = max(control_event['start_time'], pressure_event['start_time']) overlap_end = min(control_event['end_time'], pressure_event['end_time']) # 如果有交集且交集时间大于0 if overlap_start < overlap_end: overlap_duration = (overlap_end - overlap_start).total_seconds() / 60 if overlap_duration > 0: matched_results.append({ 'control_start': control_event['start_time'], 'control_end': control_event['end_time'], 'pressure_start': pressure_event['start_time'], 'pressure_end': pressure_event['end_time'], 'overlap_start': overlap_start, 'overlap_end': overlap_end, 'overlap_duration_minutes': overlap_duration, 'control_word': control_event['control_word'], 'avg_pressure': pressure_event['avg_pressure'] }) return matched_results def calculate_total_matched_time(matched_events): """计算匹配的总时长,避免重复计算重叠时间""" if not matched_events: return 0, [] # 合并重叠的时间段 time_intervals = [] for event in matched_events: time_intervals.append((event['overlap_start'], event['overlap_end'])) # 按开始时间排序 time_intervals.sort(key=lambda x: x[0]) # 合并重叠区间 merged_intervals = [] current_start, current_end = time_intervals[0] for start, end in time_intervals[1:]: if start <= current_end: current_end = max(current_end, end) else: merged_intervals.append((current_start, current_end)) current_start, current_end = start, end merged_intervals.append((current_start, current_end)) # 计算总时长 total_duration = 0 for start, end in merged_intervals: total_duration += (end - start).total_seconds() / 60 return total_duration, merged_intervals def analyze_both_exist_but_no_match(control_events, pressure_events, date): """分析同时存在两种异常但时间段不重叠的原因""" reasons = [] reasons.append("控制字异常和进水压力异常同时存在,但时间段没有重叠") # 详细列出控制字异常事件 reasons.append("控制字异常事件详情:") for i, event in enumerate(control_events, 1): segment_type = " (初始段)" if event.get('is_initial_segment', False) else "" reasons.append( f" 事件{i}: {event['start_time']} - {event['end_time']} (时长: {event['duration_minutes']:.1f}分钟, 控制字: {event['control_word']}{segment_type})") # 详细列出进水压力异常事件 reasons.append("进水压力异常事件详情:") for i, event in enumerate(pressure_events, 1): reasons.append( f" 事件{i}: {event['start_time']} - {event['end_time']} (时长: {event['duration_minutes']:.1f}分钟, 平均压力: {event['avg_pressure']:.3f}MPa)") # 分析时间范围 control_start_min = min(event['start_time'] for event in control_events) control_end_max = max(event['end_time'] for event in control_events) pressure_start_min = min(event['start_time'] for event in pressure_events) pressure_end_max = max(event['end_time'] for event in pressure_events) reasons.append(f"控制字异常总体时间范围: {control_start_min.time()} - {control_end_max.time()}") reasons.append(f"进水压力异常总体时间范围: {pressure_start_min.time()} - {pressure_end_max.time()}") # 检查是否有时间重叠的可能性 if control_end_max < pressure_start_min: reasons.append("时间关系: 所有控制字异常都在进水压力异常之前结束") elif pressure_end_max < control_start_min: reasons.append("时间关系: 所有进水压力异常都在控制字异常之前结束") else: reasons.append("时间关系: 异常事件在时间上有交错,但没有具体时间段重叠") return reasons def mark_cip_events(events, unit_id): """标记CIP事件的开始和结束""" # 整理事件数据 event_data = [] for date, event_list in events.items(): for event in event_list: event_data.append({ 'date': pd.to_datetime(date), 'duration_minutes': event['duration_minutes'], }) df = pd.DataFrame(event_data) if df.empty: return pd.DataFrame(columns=["start_date", "end_date", "unit_id"]) df = df.groupby("date", as_index=False)['duration_minutes'].sum().sort_values("date") cip_records = [] i = 0 while i < len(df): current_date = df.loc[i, 'date'] # 计算从当前日开始的后 10 天累计时长 end_window = current_date + timedelta(days=10) window_df = df[(df['date'] >= current_date) & (df['date'] <= end_window)] total_duration = window_df['duration_minutes'].sum() if total_duration > 100: # CIP 开始日 start_date = current_date # CIP 结束日 = 窗口中最后一天有事件的日期 end_date = window_df['date'].max() cip_records.append({ "start_date": start_date, "end_date": end_date, "unit_id": unit_id }) # 跳到结束日之后,避免重复计入 i = df.index[df['date'] > end_date].min() if (df['date'] > end_date).any() else len(df) else: i += 1 return pd.DataFrame(cip_records) def analyze_unit(unit_id): """ 分析单个机组: - 查找控制字异常 & 压力异常 - 匹配异常事件 - 对匹配成功的事件进行 CIP 筛选 - 返回 CIP 事件 DataFrame """ print(f"\n正在处理 {unit_id} 号机组...") # 读取控制字数据 control_df = read_control_word_data(unit_id) target_dates, control_events_by_date = find_control_word_events(control_df, unit_id) # 读取压力数据 pressure_df = read_pressure_data() pressure_events_by_date = find_pressure_events(pressure_df, target_dates, unit_id) # 找出两种异常都存在的日期 both_exist_dates = [d for d in target_dates if d in pressure_events_by_date] # 匹配结果收集 matched_control_events = {} matched_pressure_events = {} for date in both_exist_dates: control_events = control_events_by_date.get(date, []) pressure_events = pressure_events_by_date.get(date, []) matched_events = match_events(control_events, pressure_events) if matched_events: # 只保留匹配成功的 matched_control_events[date] = control_events matched_pressure_events[date] = pressure_events # 只对匹配成功的日期做 CIP 判断 combined_events = {**matched_control_events, **matched_pressure_events} cip_df = mark_cip_events(combined_events, unit_id) return cip_df def main(): """ - 依次处理 1–4 号机组 - 合并所有机组的 CIP 事件 - 保存到一个 CSV 文件 """ all_cip_dfs = [] for unit_id in range(1, 5): cip_df = analyze_unit(unit_id) all_cip_dfs.append(cip_df) # 合并结果 final_cip_df = pd.concat(all_cip_dfs, ignore_index=True).sort_values(['unit_id', 'start_date']) # 保存到 CSV save_path = "CIP_events_all_units.csv" final_cip_df.to_csv(save_path, index=False) print(f"\n所有机组的 CIP 清洗事件区间已保存到 {save_path}") if __name__ == "__main__": main()