| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- import pandas as pd
- from datetime import datetime, timedelta
- import os
- def read_cip_events(file_path="CIP_events_all_units.csv"):
- """
- 读取 CIP 事件文件
- 参数:
- file_path: CIP 事件 CSV 文件路径
- 返回:
- DataFrame,包含 unit_id, start_date, end_date
- """
- cip_df = pd.read_csv(file_path, parse_dates=['start_date', 'end_date'])
- cip_df = cip_df.sort_values(['unit_id', 'start_date']).reset_index(drop=True)
- return cip_df
- def filter_cip_events(cip_df, min_gap_days=30):
- """
- 过滤 CIP 事件:
- 1. 删除与前一 CIP 结束间隔 < min_gap_days 的事件
- 2. 删除与下一 CIP 开始间隔 < min_gap_days 的事件
- 参数:
- cip_df: 原始 CIP 事件 DataFrame
- min_gap_days: 间隔阈值(天)
- 返回:
- 过滤后的 CIP DataFrame
- """
- filtered_all = []
- # 按机组处理
- for unit_id, group in cip_df.groupby('unit_id'):
- group = group.sort_values('start_date').reset_index(drop=True)
- # -------- 第一步:检查前一 CIP 间隔 --------
- keep_rows = []
- for idx, row in group.iterrows():
- if idx == 0:
- keep_rows.append(row)
- continue
- prev_end = group.loc[idx - 1, 'end_date']
- if (row['start_date'] - prev_end).days >= min_gap_days:
- keep_rows.append(row)
- # 如果间隔小于 min_gap_days,就直接不保留该事件
- group = pd.DataFrame(keep_rows).reset_index(drop=True)
- # -------- 第二步:检查下一 CIP 间隔 --------
- keep_rows = []
- for idx, row in group.iterrows():
- if idx == len(group) - 1:
- keep_rows.append(row)
- continue
- next_start = group.loc[idx + 1, 'start_date']
- if (next_start - row['end_date']).days >= min_gap_days:
- keep_rows.append(row)
- # 如果与下一 CIP 开始间隔小于 min_gap_days,则不保留该事件
- filtered_all.append(pd.DataFrame(keep_rows))
- return pd.concat(filtered_all, ignore_index=True)
- def get_full_stop_days(control_file):
- """
- 计算完全停机天数:
- - 一整天没有控制字记录的日期视为完全停机
- """
- df = pd.read_csv(control_file, parse_dates=['time'])
- df = df.sort_values('time')
- full_stop_days = []
- if df.empty:
- return full_stop_days
- # 获取记录的日期范围
- start_date = df['time'].dt.date.min()
- end_date = df['time'].dt.date.max()
- # 生成连续日期(datetime.date)
- all_dates = pd.date_range(start=start_date, end=end_date).to_pydatetime()
- all_dates = [d.date() for d in all_dates] # 转成 datetime.date
- # 获取已记录的日期
- recorded_dates = set(df['time'].dt.date.unique())
- # 没有记录的日期即完全停机天
- for d in all_dates:
- if d not in recorded_dates:
- full_stop_days.append(d)
- return full_stop_days
- def calculate_cip_cycles(filtered_cip_df, high_freq_dir="high_freq_cip"):
- """
- 根据 CIP 事件和完全停机天数计算 CIP 周期
- 参数:
- filtered_cip_df: 过滤后的 CIP 事件 DataFrame
- high_freq_dir: 控制字文件所在目录
- 返回:
- DataFrame,包含 unit_id, cycle_start, cycle_end, cycle_days
- """
- cip_cycles = []
- # 按机组处理
- for unit_id, group in filtered_cip_df.groupby('unit_id'):
- group = group.sort_values('start_date').reset_index(drop=True)
- # 读取控制字文件,获取完全停机天数
- control_file = os.path.join(high_freq_dir, f"C_M_RO{unit_id}_DB_control_word_raw.csv")
- full_stop_days = get_full_stop_days(control_file)
- for idx in range(len(group)):
- # 周期开始时间 = 上一个 CIP 结束的下一天
- prev_end = group.loc[idx - 1, 'end_date'] if idx > 0 else None
- start_cycle = (prev_end + timedelta(days=1)) if prev_end else None
- # 周期结束时间 = 当前 CIP 开始的前一天
- end_cycle = group.loc[idx, 'start_date'] - timedelta(days=1)
- # 如果没有可计算周期,则跳过
- if start_cycle and start_cycle > end_cycle:
- continue
- if start_cycle:
- # 周期天数 = 周期结束日 - 周期开始日 + 1 - 完全停机天数
- cycle_days = (end_cycle - start_cycle).days + 1
- stop_days = sum(1 for d in full_stop_days if start_cycle.date() <= d <= end_cycle.date())
- cycle_days -= stop_days
- cip_cycles.append({
- 'unit_id': unit_id,
- 'cycle_start': start_cycle,
- 'cycle_end': end_cycle,
- 'cycle_days': cycle_days
- })
- return pd.DataFrame(cip_cycles)
- # ======= 主流程 =======
- if __name__ == "__main__":
- # 1. 读取 CIP 事件文件
- cip_file = "CIP_events_all_units.csv"
- cip_df = read_cip_events(cip_file)
- # 2. 过滤 CIP 事件(删除间隔小于30天的异常事件)
- filtered_cip_df = filter_cip_events(cip_df, min_gap_days=30)
- # 3. 计算 CIP 周期(排除完全停机天数)
- cip_cycles_df = calculate_cip_cycles(filtered_cip_df, high_freq_dir="high_freq_cip")
- # 4. 保存结果
- cip_cycles_df.to_csv("CIP_cycles_with_days.csv", index=False)
- print("CIP 周期计算完成,结果已保存为 CIP_cycles_with_days.csv")
|