import pandas as pd from datetime import datetime, timedelta import warnings import os warnings.filterwarnings('ignore') def read_cip_cycles(file_path="CIP_cycles_with_days.csv"): """ 读取 CIP 周期 CSV 文件 返回 DataFrame: unit_id, cycle_start, cycle_end, cycle_days """ file_path = os.path.join(os.path.dirname(__file__), file_path) df = pd.read_csv(file_path, parse_dates=['cycle_start', 'cycle_end']) df = df.sort_values(['unit_id', 'cycle_start']).reset_index(drop=True) return df def find_last_year_cycle(cip_df, unit_id, target_date, min_days=45, max_days=120): """ 根据目标日期匹配去年 CIP 周期天数,并保证周期天数在[min_days, max_days]范围内 参数: cip_df: CIP 周期 DataFrame unit_id: 用户指定机组 target_date: datetime 对象(今年日期) min_days: 最小周期天数 max_days: 最大周期天数 返回: 对应去年周期天数 (int) """ last_year = target_date.year - 1 last_year_date = datetime(last_year, target_date.month, target_date.day) # 筛选该机组去年周期 unit_df = cip_df[cip_df['unit_id'] == unit_id] last_year_df = unit_df[(unit_df['cycle_start'].dt.year <= last_year) & (unit_df['cycle_end'].dt.year >= last_year)] if last_year_df.empty: raise ValueError(f"机组 {unit_id} 去年没有 CIP 周期记录") # 先尝试匹配包含日期的周期,且在范围内 for _, row in last_year_df.iterrows(): if row['cycle_start'].date() <= last_year_date.date() <= row['cycle_end'].date(): if min_days <= row['cycle_days'] <= max_days: return row['cycle_days'] # 如果没有匹配,按距离排序找最近周期 last_year_df['distance'] = last_year_df.apply( lambda r: min(abs((r['cycle_start'].date() - last_year_date.date()).days), abs((r['cycle_end'].date() - last_year_date.date()).days)), axis=1 ) # 按距离从小到大排序,依次找符合天数范围的周期 last_year_df_sorted = last_year_df.sort_values('distance') for _, row in last_year_df_sorted.iterrows(): if min_days <= row['cycle_days'] <= max_days: return row['cycle_days'] # 如果都不符合,报错 raise ValueError(f"机组 {unit_id} 去年没有符合 {min_days}-{max_days} 天的 CIP 周期") def depreciate_from_previous_year(last_year_days, membrane_life_years, final_reduction_rate, current_year, previous_year=None): """ 根据去年的已折旧周期计算今年的周期(假设每年按线性增量缩短,最终累计为 final_reduction_rate) 参数: last_year_days: 去年观测到的周期天数(不是原始第1年的值) membrane_life_years: 膜总寿命(年) final_reduction_rate: 最后一年相对于原始的累计缩短率(例如 0.4) current_year: 膜的当前使用年份(从1开始) previous_year: 已有周期对应的膜使用年份索引(若为 None,则默认 previous_year = current_year - 1) 返回: 折旧后的周期天数(int) """ if previous_year is None: previous_year = current_year - 1 if previous_year < 1 or current_year < 1: raise ValueError("年份索引必须 >= 1") if membrane_life_years <= 1: raise ValueError("membrane_life_years 必须大于 1") # 每年的缩短比例(线性分摊到 membrane_life_years-1 年) annual_reduction = final_reduction_rate / (membrane_life_years - 1) # 去年、今年相对于原始的累计缩短率 r_prev = annual_reduction * (previous_year - 1) r_curr = annual_reduction * (current_year - 1) # 安全检查,避免除以 0 或负值 if (1 - r_prev) <= 0: raise ValueError("去年累计缩短率使得 (1 - r_prev) <= 0,无法反推原始值。检查参数。") if r_curr >= 1: raise ValueError("今年累计缩短率 >=100%,结果不合理。检查参数。") # 公式:D_current = D_prev * (1 - r_curr) / (1 - r_prev) current_days = last_year_days * (1 - r_curr) / (1 - r_prev) return int(round(current_days)) def main(unit_id, reference_date=None): membrane_life_years = 5 final_reduction_rate = 0.4 current_year = 4 # 当前是第几年使用 # 使用传入的参考日期,如果没有则使用当前时间 today = reference_date if reference_date is not None else datetime.today() # 读取 CIP 周期数据 cip_df = read_cip_cycles("CIP_cycles_with_days.csv") # 匹配去年周期天数(带 >=45 天的筛选逻辑) last_year_days = find_last_year_cycle(cip_df, unit_id, today, min_days=45) print(f"去年对应周期天数: {last_year_days}") # 计算折旧后的周期天数(基于“去年值 → 今年值”的公式) current_days = depreciate_from_previous_year( last_year_days=last_year_days, membrane_life_years=membrane_life_years, final_reduction_rate=final_reduction_rate, current_year=current_year ) return current_days # ===== 主程序 ===== if __name__ == "__main__": # 用户输入 unit_id = int(input("请输入机组编号: ")) # 计算折旧后的周期天数(基于“去年值 → 今年值”的公式) current_days = main(unit_id) print(f"膜折旧后的周期天数: {current_days}")