||
- import pandas as pd
- import os
- from datetime import datetime, timedelta
- import numpy as np
- def read_control_word_data(unit_id):
- """读取控制字数据"""
- file_path = f'datasets/RO_data_logs/high_freq_cip/C_M_RO{unit_id}_DB_control_word_raw.csv'
- df = pd.read_csv(file_path)
- df['time'] = pd.to_datetime(df['time'])
- df = df.sort_values('time')
- df['date'] = df['time'].dt.date
- return df
- def read_pressure_data():
- """读取进水压力数据"""
- file_path = 'datasets/RO_data_logs/high_freq_cip/cip_sensors_1min_merged.csv'
- df = pd.read_csv(file_path)
- df['time'] = pd.to_datetime(df['time'])
- df = df.sort_values('time')
- df['date'] = df['time'].dt.date
- return df
- def find_control_word_events(df, unit_id):
- """查找持续时间超过 3h且控制字不为24.0的控制字异常事件"""
- result_days = []
- control_events_by_date = {}
- # 按日期分组
- dates = sorted(df['date'].unique())
- # 存储前一天的最后控制字
- prev_day_last_control_word = None
- for i, current_date in enumerate(dates):
- # 获取当前日期的数据
- day_data = df[df['date'] == current_date].copy()
- day_data = day_data.sort_values('time').reset_index(drop=True)
- day_events = []
- # 处理当前日期的第一个事件段(00:00:00 - 第一个记录时间)
- first_record_time = day_data.iloc[0]['time']
- first_record_control_word = day_data.iloc[0][f'C.M.RO{unit_id}_DB@control_word']
- # 确定第一个事件段的控制字
- if i == 0: # 第一天,没有前一天
- first_segment_control_word = 0 # 按照指导,第一天初始为停机状态
- else:
- first_segment_control_word = prev_day_last_control_word
- # 计算第一个事件段的持续时间(00:00:00 到 第一个记录时间)
- day_start = datetime.combine(current_date, datetime.min.time())
- first_segment_duration = (first_record_time - day_start).total_seconds() / 60
- # 如果第一个事件段持续时间超过 3h且控制字不为24.0,则记录
- if (first_segment_duration > 180 and
- first_segment_control_word != 24.0 and
- first_segment_control_word is not None):
- day_events.append({
- 'start_time': day_start,
- 'end_time': first_record_time,
- 'duration_minutes': first_segment_duration,
- 'control_word': first_segment_control_word,
- 'is_initial_segment': True
- })
- # 处理当前日期内的其他事件段
- current_control_word = first_record_control_word
- current_start_time = first_record_time
- for j in range(1, len(day_data)):
- current_time = day_data.iloc[j]['time']
- current_control = day_data.iloc[j][f'C.M.RO{unit_id}_DB@control_word']
- if current_control != current_control_word:
- # 状态变化,记录前一个事件段
- duration = (current_time - current_start_time).total_seconds() / 60
- if duration > 180 and current_control_word != 24.0:
- day_events.append({
- 'start_time': current_start_time,
- 'end_time': current_time,
- 'duration_minutes': duration,
- 'control_word': current_control_word,
- 'is_initial_segment': False
- })
- current_control_word = current_control
- current_start_time = current_time
- # 处理最后一个事件段(最后一个记录时间到23:59:59)
- last_record_time = day_data.iloc[-1]['time']
- last_control_word = day_data.iloc[-1][f'C.M.RO{unit_id}_DB@control_word']
- day_end = datetime.combine(current_date, datetime.max.time()).replace(hour=23, minute=59, second=59)
- last_segment_duration = (day_end - last_record_time).total_seconds() / 60
- if (last_segment_duration > 180 and
- last_control_word != 24.0):
- day_events.append({
- 'start_time': last_record_time,
- 'end_time': day_end,
- 'duration_minutes': last_segment_duration,
- 'control_word': last_control_word,
- 'is_initial_segment': False
- })
- # 保存前一天的最后控制字
- prev_day_last_control_word = last_control_word
- # 如果有任何异常事件,添加到结果中
- if day_events:
- result_days.append(current_date)
- control_events_by_date[current_date] = day_events
- return result_days, control_events_by_date
- def find_pressure_events(pressure_df, target_dates, unit_id):
- """查找进水压力异常事件"""
- pressure_events = {}
- for date in target_dates:
- # 筛选指定日期的数据
- day_data = pressure_df[pressure_df['date'] == date].copy()
- if len(day_data) == 0:
- continue
- day_data = day_data.sort_values('time')
- day_data['pressure_valid'] = day_data[f'C.M.RO{unit_id}_PT_JS@out'].between(0.05, 0.5)
- events = []
- current_streak = 0
- gap_count = 0
- event_start = None
- event_end = None
- for i, row in day_data.iterrows():
- if row['pressure_valid']:
- if current_streak == 0:
- event_start = row['time']
- current_streak += 1
- gap_count = 0 # 重置间断计数
- event_end = row['time']
- else:
- if current_streak > 0:
- gap_count += 1
- if gap_count > 5: # 超过5分钟间断,结束当前事件
- duration = (event_end - event_start).total_seconds() / 60 + 1 # 加上最后1分钟
- if duration >= 20:
- events.append({
- 'start_time': event_start,
- 'end_time': event_end,
- 'duration_minutes': duration,
- 'avg_pressure': day_data.loc[
- (day_data['time'] >= event_start) &
- (day_data['time'] <= event_end) &
- day_data['pressure_valid']
- ][f'C.M.RO{unit_id}_PT_JS@out'].mean()
- })
- current_streak = 0
- gap_count = 0
- # 检查最后一段
- if current_streak > 0:
- duration = (event_end - event_start).total_seconds() / 60 + 1
- if duration >= 20:
- events.append({
- 'start_time': event_start,
- 'end_time': event_end,
- 'duration_minutes': duration,
- 'avg_pressure': day_data.loc[
- (day_data['time'] >= event_start) &
- (day_data['time'] <= event_end) &
- day_data['pressure_valid']
- ][f'C.M.RO{unit_id}_PT_JS@out'].mean()
- })
- if events:
- pressure_events[date] = events
- return pressure_events
- def match_events(control_events, pressure_events):
- """匹配控制字异常和进水压力异常的时间段"""
- matched_results = []
- for control_event in control_events:
- for pressure_event in pressure_events:
- # 计算时间段的交集
- overlap_start = max(control_event['start_time'], pressure_event['start_time'])
- overlap_end = min(control_event['end_time'], pressure_event['end_time'])
- # 如果有交集且交集时间大于0
- if overlap_start < overlap_end:
- overlap_duration = (overlap_end - overlap_start).total_seconds() / 60
- if overlap_duration > 0:
- matched_results.append({
- 'control_start': control_event['start_time'],
- 'control_end': control_event['end_time'],
- 'pressure_start': pressure_event['start_time'],
- 'pressure_end': pressure_event['end_time'],
- 'overlap_start': overlap_start,
- 'overlap_end': overlap_end,
- 'overlap_duration_minutes': overlap_duration,
- 'control_word': control_event['control_word'],
- 'avg_pressure': pressure_event['avg_pressure']
- })
- return matched_results
- def calculate_total_matched_time(matched_events):
- """计算匹配的总时长,避免重复计算重叠时间"""
- if not matched_events:
- return 0, []
- # 合并重叠的时间段
- time_intervals = []
- for event in matched_events:
- time_intervals.append((event['overlap_start'], event['overlap_end']))
- # 按开始时间排序
- time_intervals.sort(key=lambda x: x[0])
- # 合并重叠区间
- merged_intervals = []
- current_start, current_end = time_intervals[0]
- for start, end in time_intervals[1:]:
- if start <= current_end:
- current_end = max(current_end, end)
- else:
- merged_intervals.append((current_start, current_end))
- current_start, current_end = start, end
- merged_intervals.append((current_start, current_end))
- # 计算总时长
- total_duration = 0
- for start, end in merged_intervals:
- total_duration += (end - start).total_seconds() / 60
- return total_duration, merged_intervals
- def analyze_both_exist_but_no_match(control_events, pressure_events, date):
- """分析同时存在两种异常但时间段不重叠的原因"""
- reasons = []
- reasons.append("控制字异常和进水压力异常同时存在,但时间段没有重叠")
- # 详细列出控制字异常事件
- reasons.append("控制字异常事件详情:")
- for i, event in enumerate(control_events, 1):
- segment_type = " (初始段)" if event.get('is_initial_segment', False) else ""
- reasons.append(
- f" 事件{i}: {event['start_time']} - {event['end_time']} (时长: {event['duration_minutes']:.1f}分钟, 控制字: {event['control_word']}{segment_type})")
- # 详细列出进水压力异常事件
- reasons.append("进水压力异常事件详情:")
- for i, event in enumerate(pressure_events, 1):
- reasons.append(
- f" 事件{i}: {event['start_time']} - {event['end_time']} (时长: {event['duration_minutes']:.1f}分钟, 平均压力: {event['avg_pressure']:.3f}MPa)")
- # 分析时间范围
- control_start_min = min(event['start_time'] for event in control_events)
- control_end_max = max(event['end_time'] for event in control_events)
- pressure_start_min = min(event['start_time'] for event in pressure_events)
- pressure_end_max = max(event['end_time'] for event in pressure_events)
- reasons.append(f"控制字异常总体时间范围: {control_start_min.time()} - {control_end_max.time()}")
- reasons.append(f"进水压力异常总体时间范围: {pressure_start_min.time()} - {pressure_end_max.time()}")
- # 检查是否有时间重叠的可能性
- if control_end_max < pressure_start_min:
- reasons.append("时间关系: 所有控制字异常都在进水压力异常之前结束")
- elif pressure_end_max < control_start_min:
- reasons.append("时间关系: 所有进水压力异常都在控制字异常之前结束")
- else:
- reasons.append("时间关系: 异常事件在时间上有交错,但没有具体时间段重叠")
- return reasons
- def mark_cip_events(events, unit_id):
- """标记CIP事件的开始和结束"""
- # 整理事件数据
- event_data = []
- for date, event_list in events.items():
- for event in event_list:
- event_data.append({
- 'date': pd.to_datetime(date),
- 'duration_minutes': event['duration_minutes'],
- })
- df = pd.DataFrame(event_data)
- if df.empty:
- return pd.DataFrame(columns=["start_date", "end_date", "unit_id"])
- df = df.groupby("date", as_index=False)['duration_minutes'].sum().sort_values("date")
- cip_records = []
- i = 0
- while i < len(df):
- current_date = df.loc[i, 'date']
- # 计算从当前日开始的后 10 天累计时长
- end_window = current_date + timedelta(days=10)
- window_df = df[(df['date'] >= current_date) & (df['date'] <= end_window)]
- total_duration = window_df['duration_minutes'].sum()
- if total_duration > 100:
- # CIP 开始日
- start_date = current_date
- # CIP 结束日 = 窗口中最后一天有事件的日期
- end_date = window_df['date'].max()
- cip_records.append({
- "start_date": start_date,
- "end_date": end_date,
- "unit_id": unit_id
- })
- # 跳到结束日之后,避免重复计入
- i = df.index[df['date'] > end_date].min() if (df['date'] > end_date).any() else len(df)
- else:
- i += 1
- return pd.DataFrame(cip_records)
- def analyze_unit(unit_id):
- """
- 分析单个机组:
- - 查找控制字异常 & 压力异常
- - 匹配异常事件
- - 对匹配成功的事件进行 CIP 筛选
- - 返回 CIP 事件 DataFrame
- """
- print(f"\n正在处理 {unit_id} 号机组...")
- # 读取控制字数据
- control_df = read_control_word_data(unit_id)
- target_dates, control_events_by_date = find_control_word_events(control_df, unit_id)
- # 读取压力数据
- pressure_df = read_pressure_data()
- pressure_events_by_date = find_pressure_events(pressure_df, target_dates, unit_id)
- # 找出两种异常都存在的日期
- both_exist_dates = [d for d in target_dates if d in pressure_events_by_date]
- # 匹配结果收集
- matched_control_events = {}
- matched_pressure_events = {}
- for date in both_exist_dates:
- control_events = control_events_by_date.get(date, [])
- pressure_events = pressure_events_by_date.get(date, [])
- matched_events = match_events(control_events, pressure_events)
- if matched_events: # 只保留匹配成功的
- matched_control_events[date] = control_events
- matched_pressure_events[date] = pressure_events
- # 只对匹配成功的日期做 CIP 判断
- combined_events = {**matched_control_events, **matched_pressure_events}
- cip_df = mark_cip_events(combined_events, unit_id)
- return cip_df
- def main():
- """
- - 依次处理 1–4 号机组
- - 合并所有机组的 CIP 事件
- - 保存到一个 CSV 文件
- """
- all_cip_dfs = []
- for unit_id in range(1, 5):
- cip_df = analyze_unit(unit_id)
- all_cip_dfs.append(cip_df)
- # 合并结果
- final_cip_df = pd.concat(all_cip_dfs, ignore_index=True).sort_values(['unit_id', 'start_date'])
- # 保存到 CSV
- save_path = "CIP_events_all_units.csv"
- final_cip_df.to_csv(save_path, index=False)
- print(f"\n所有机组的 CIP 清洗事件区间已保存到 {save_path}")
- if __name__ == "__main__":
- main()
|