import pandas as pd from datetime import datetime, timedelta import os def read_cip_events(file_path="CIP_events_all_units.csv"): """ 读取 CIP 事件文件 参数: file_path: CIP 事件 CSV 文件路径 返回: DataFrame,包含 unit_id, start_date, end_date """ cip_df = pd.read_csv(file_path, parse_dates=['start_date', 'end_date']) cip_df = cip_df.sort_values(['unit_id', 'start_date']).reset_index(drop=True) return cip_df def filter_cip_events(cip_df, min_gap_days=30): """ 过滤 CIP 事件: 1. 删除与前一 CIP 结束间隔 < min_gap_days 的事件 2. 删除与下一 CIP 开始间隔 < min_gap_days 的事件 参数: cip_df: 原始 CIP 事件 DataFrame min_gap_days: 间隔阈值(天) 返回: 过滤后的 CIP DataFrame """ filtered_all = [] # 按机组处理 for unit_id, group in cip_df.groupby('unit_id'): group = group.sort_values('start_date').reset_index(drop=True) # -------- 第一步:检查前一 CIP 间隔 -------- keep_rows = [] for idx, row in group.iterrows(): if idx == 0: keep_rows.append(row) continue prev_end = group.loc[idx - 1, 'end_date'] if (row['start_date'] - prev_end).days >= min_gap_days: keep_rows.append(row) # 如果间隔小于 min_gap_days,就直接不保留该事件 group = pd.DataFrame(keep_rows).reset_index(drop=True) # -------- 第二步:检查下一 CIP 间隔 -------- keep_rows = [] for idx, row in group.iterrows(): if idx == len(group) - 1: keep_rows.append(row) continue next_start = group.loc[idx + 1, 'start_date'] if (next_start - row['end_date']).days >= min_gap_days: keep_rows.append(row) # 如果与下一 CIP 开始间隔小于 min_gap_days,则不保留该事件 filtered_all.append(pd.DataFrame(keep_rows)) return pd.concat(filtered_all, ignore_index=True) def get_full_stop_days(control_file): """ 计算完全停机天数: - 一整天没有控制字记录的日期视为完全停机 """ df = pd.read_csv(control_file, parse_dates=['time']) df = df.sort_values('time') full_stop_days = [] if df.empty: return full_stop_days # 获取记录的日期范围 start_date = df['time'].dt.date.min() end_date = df['time'].dt.date.max() # 生成连续日期(datetime.date) all_dates = pd.date_range(start=start_date, end=end_date).to_pydatetime() all_dates = [d.date() for d in all_dates] # 转成 datetime.date # 获取已记录的日期 recorded_dates = set(df['time'].dt.date.unique()) # 没有记录的日期即完全停机天 for d in all_dates: if d not in recorded_dates: full_stop_days.append(d) return full_stop_days def calculate_cip_cycles(filtered_cip_df, high_freq_dir="high_freq_cip"): """ 根据 CIP 事件和完全停机天数计算 CIP 周期 参数: filtered_cip_df: 过滤后的 CIP 事件 DataFrame high_freq_dir: 控制字文件所在目录 返回: DataFrame,包含 unit_id, cycle_start, cycle_end, cycle_days """ cip_cycles = [] # 按机组处理 for unit_id, group in filtered_cip_df.groupby('unit_id'): group = group.sort_values('start_date').reset_index(drop=True) # 读取控制字文件,获取完全停机天数 control_file = os.path.join(high_freq_dir, f"C_M_RO{unit_id}_DB_control_word_raw.csv") full_stop_days = get_full_stop_days(control_file) for idx in range(len(group)): # 周期开始时间 = 上一个 CIP 结束的下一天 prev_end = group.loc[idx - 1, 'end_date'] if idx > 0 else None start_cycle = (prev_end + timedelta(days=1)) if prev_end else None # 周期结束时间 = 当前 CIP 开始的前一天 end_cycle = group.loc[idx, 'start_date'] - timedelta(days=1) # 如果没有可计算周期,则跳过 if start_cycle and start_cycle > end_cycle: continue if start_cycle: # 周期天数 = 周期结束日 - 周期开始日 + 1 - 完全停机天数 cycle_days = (end_cycle - start_cycle).days + 1 stop_days = sum(1 for d in full_stop_days if start_cycle.date() <= d <= end_cycle.date()) cycle_days -= stop_days cip_cycles.append({ 'unit_id': unit_id, 'cycle_start': start_cycle, 'cycle_end': end_cycle, 'cycle_days': cycle_days }) return pd.DataFrame(cip_cycles) # ======= 主流程 ======= if __name__ == "__main__": # 1. 读取 CIP 事件文件 cip_file = "CIP_events_all_units.csv" cip_df = read_cip_events(cip_file) # 2. 过滤 CIP 事件(删除间隔小于30天的异常事件) filtered_cip_df = filter_cip_events(cip_df, min_gap_days=30) # 3. 计算 CIP 周期(排除完全停机天数) cip_cycles_df = calculate_cip_cycles(filtered_cip_df, high_freq_dir="high_freq_cip") # 4. 保存结果 cip_cycles_df.to_csv("CIP_cycles_with_days.csv", index=False) print("CIP 周期计算完成,结果已保存为 CIP_cycles_with_days.csv")