import pandas as pd from datetime import timedelta # ========================= # 1. 读取标签数据 # ========================= def load_labels(file_path): df = pd.read_csv(file_path) df['date'] = pd.to_datetime(df['date']) df['unit_id'] = df['unit_id'].astype(str).str.strip() return df # ========================= # 2. 提取高置信CIP日期(按月筛选) # ========================= def get_cip_anchor_days(df_unit): df_unit = df_unit.copy() df_unit['year_month'] = df_unit['date'].dt.to_period('M') selected_dates = [] for ym, group in sorted(df_unit.groupby('year_month')): group = group.sort_values('date') if len(group) == 1: selected_dates.extend(group['date'].tolist()) else: group_0 = group[group['label'] == 0] if len(group_0) > 0: selected_dates.extend(group_0['date'].tolist()) else: selected_dates.extend(group['date'].tolist()) return sorted(selected_dates) # ========================= # 3. 合并连续CIP天 # ========================= def merge_consecutive_days(dates): if len(dates) == 0: return [] merged = [] start = dates[0] prev = dates[0] for d in dates[1:]: if (d - prev).days <= 1: prev = d else: merged.append((start, prev)) start = d prev = d merged.append((start, prev)) return merged # ========================= # 4. 构建周期 + 筛选 # ========================= def build_cycles(cip_events, unit_id): MIN_CYCLE_DAYS = 25 MAX_CYCLE_DAYS = 150 cycles = [] valid_id = 1 for i in range(len(cip_events) - 1): prev_end = cip_events[i][1] next_start = cip_events[i + 1][0] cycle_start = prev_end + timedelta(days=2) cycle_end = next_start - timedelta(days=1) if cycle_start <= cycle_end: length_days = (cycle_end - cycle_start).days + 1 if MIN_CYCLE_DAYS <= length_days <= MAX_CYCLE_DAYS: cycles.append({ "unit_id": unit_id, "cycle_id": valid_id, "start_date": cycle_start, "end_date": cycle_end, "length_days": length_days }) valid_id += 1 else: print( f"[过滤] 机组{unit_id} 周期候选{i+1} " f"长度{length_days}天(不在25–150范围)" ) return pd.DataFrame(cycles) # ========================= # 5. 单机组处理(已修复核心错误) # ========================= def process_unit(df, unit_id): print(f"\n处理机组 {unit_id}...") df_unit = df[df['unit_id'] == unit_id].copy() if df_unit.empty: print(f"机组 {unit_id} 无数据") return pd.DataFrame() cip_days = get_cip_anchor_days(df_unit) cip_events = merge_consecutive_days(cip_days) cycles_df = build_cycles(cip_events, unit_id) return cycles_df # ========================= # 6. 主程序 # ========================= def main(): input_path = "../use_data/cip_day_labels_all_units.csv" output_path = "../use_data/cip_cycles_all_units.csv" df = load_labels(input_path) all_cycles = [] for unit_id in sorted(df['unit_id'].unique()): cycles_df = process_unit(df, unit_id) if not cycles_df.empty: all_cycles.append(cycles_df) else: print(f"机组{unit_id}无有效周期") if len(all_cycles) == 0: print("未检测到任何有效周期") return final_df = pd.concat(all_cycles, ignore_index=True) final_df = final_df.sort_values(['unit_id', 'cycle_id']) final_df['start_date'] = final_df['start_date'].dt.date final_df['end_date'] = final_df['end_date'].dt.date final_df.to_csv(output_path, index=False) print(f"\nCIP周期已保存到: {output_path}") print("\n周期长度统计:") print(final_df.groupby('unit_id')['length_days'].describe()) if __name__ == "__main__": main()