| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- import pandas as pd
- from datetime import timedelta
- # =========================
- # 1. 读取标签数据
- # =========================
- def load_labels(file_path):
- df = pd.read_csv(file_path)
- df['date'] = pd.to_datetime(df['date'])
- df['unit_id'] = df['unit_id'].astype(str).str.strip()
- return df
- # =========================
- # 2. 提取高置信CIP日期(按月筛选)
- # =========================
- def get_cip_anchor_days(df_unit):
- df_unit = df_unit.copy()
- df_unit['year_month'] = df_unit['date'].dt.to_period('M')
- selected_dates = []
- for ym, group in sorted(df_unit.groupby('year_month')):
- group = group.sort_values('date')
- if len(group) == 1:
- selected_dates.extend(group['date'].tolist())
- else:
- group_0 = group[group['label'] == 0]
- if len(group_0) > 0:
- selected_dates.extend(group_0['date'].tolist())
- else:
- selected_dates.extend(group['date'].tolist())
- return sorted(selected_dates)
- # =========================
- # 3. 合并连续CIP天
- # =========================
- def merge_consecutive_days(dates):
- if len(dates) == 0:
- return []
- merged = []
- start = dates[0]
- prev = dates[0]
- for d in dates[1:]:
- if (d - prev).days <= 1:
- prev = d
- else:
- merged.append((start, prev))
- start = d
- prev = d
- merged.append((start, prev))
- return merged
- # =========================
- # 4. 构建周期 + 筛选
- # =========================
- def build_cycles(cip_events, unit_id):
- MIN_CYCLE_DAYS = 25
- MAX_CYCLE_DAYS = 150
- cycles = []
- valid_id = 1
- for i in range(len(cip_events) - 1):
- prev_end = cip_events[i][1]
- next_start = cip_events[i + 1][0]
- cycle_start = prev_end + timedelta(days=2)
- cycle_end = next_start - timedelta(days=1)
- if cycle_start <= cycle_end:
- length_days = (cycle_end - cycle_start).days + 1
- if MIN_CYCLE_DAYS <= length_days <= MAX_CYCLE_DAYS:
- cycles.append({
- "unit_id": unit_id,
- "cycle_id": valid_id,
- "start_date": cycle_start,
- "end_date": cycle_end,
- "length_days": length_days
- })
- valid_id += 1
- else:
- print(
- f"[过滤] 机组{unit_id} 周期候选{i+1} "
- f"长度{length_days}天(不在25–150范围)"
- )
- return pd.DataFrame(cycles)
- # =========================
- # 5. 单机组处理(已修复核心错误)
- # =========================
- def process_unit(df, unit_id):
- print(f"\n处理机组 {unit_id}...")
- df_unit = df[df['unit_id'] == unit_id].copy()
- if df_unit.empty:
- print(f"机组 {unit_id} 无数据")
- return pd.DataFrame()
- cip_days = get_cip_anchor_days(df_unit)
- cip_events = merge_consecutive_days(cip_days)
- cycles_df = build_cycles(cip_events, unit_id)
- return cycles_df
- # =========================
- # 6. 主程序
- # =========================
- def main():
- input_path = "../use_data/cip_day_labels_all_units.csv"
- output_path = "../use_data/cip_cycles_all_units.csv"
- df = load_labels(input_path)
- all_cycles = []
- for unit_id in sorted(df['unit_id'].unique()):
- cycles_df = process_unit(df, unit_id)
- if not cycles_df.empty:
- all_cycles.append(cycles_df)
- else:
- print(f"机组{unit_id}无有效周期")
- if len(all_cycles) == 0:
- print("未检测到任何有效周期")
- return
- final_df = pd.concat(all_cycles, ignore_index=True)
- final_df = final_df.sort_values(['unit_id', 'cycle_id'])
- final_df['start_date'] = final_df['start_date'].dt.date
- final_df['end_date'] = final_df['end_date'].dt.date
- final_df.to_csv(output_path, index=False)
- print(f"\nCIP周期已保存到: {output_path}")
- print("\n周期长度统计:")
- print(final_df.groupby('unit_id')['length_days'].describe())
- if __name__ == "__main__":
- main()
|