| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- import pandas as pd
- from datetime import datetime, timedelta
- import warnings
- import os
- warnings.filterwarnings('ignore')
- def read_cip_cycles(file_path="CIP_cycles_with_days.csv"):
- """
- 读取 CIP 周期 CSV 文件
- 返回 DataFrame: unit_id, cycle_start, cycle_end, cycle_days
- """
- file_path = os.path.join(os.path.dirname(__file__), file_path)
- df = pd.read_csv(file_path, parse_dates=['cycle_start', 'cycle_end'])
- df = df.sort_values(['unit_id', 'cycle_start']).reset_index(drop=True)
- return df
- def find_last_year_cycle(cip_df, unit_id, target_date, min_days=45, max_days=120):
- """
- 根据目标日期匹配去年 CIP 周期天数,并保证周期天数在[min_days, max_days]范围内
- 参数:
- cip_df: CIP 周期 DataFrame
- unit_id: 用户指定机组
- target_date: datetime 对象(今年日期)
- min_days: 最小周期天数
- max_days: 最大周期天数
- 返回:
- 对应去年周期天数 (int)
- """
- last_year = target_date.year - 1
- last_year_date = datetime(last_year, target_date.month, target_date.day)
- # 筛选该机组去年周期
- unit_df = cip_df[cip_df['unit_id'] == unit_id]
- last_year_df = unit_df[(unit_df['cycle_start'].dt.year <= last_year) &
- (unit_df['cycle_end'].dt.year >= last_year)]
- if last_year_df.empty:
- raise ValueError(f"机组 {unit_id} 去年没有 CIP 周期记录")
- # 先尝试匹配包含日期的周期,且在范围内
- for _, row in last_year_df.iterrows():
- if row['cycle_start'].date() <= last_year_date.date() <= row['cycle_end'].date():
- if min_days <= row['cycle_days'] <= max_days:
- return row['cycle_days']
- # 如果没有匹配,按距离排序找最近周期
- last_year_df['distance'] = last_year_df.apply(
- lambda r: min(abs((r['cycle_start'].date() - last_year_date.date()).days),
- abs((r['cycle_end'].date() - last_year_date.date()).days)),
- axis=1
- )
- # 按距离从小到大排序,依次找符合天数范围的周期
- last_year_df_sorted = last_year_df.sort_values('distance')
- for _, row in last_year_df_sorted.iterrows():
- if min_days <= row['cycle_days'] <= max_days:
- return row['cycle_days']
- # 如果都不符合,报错
- raise ValueError(f"机组 {unit_id} 去年没有符合 {min_days}-{max_days} 天的 CIP 周期")
- def depreciate_from_previous_year(last_year_days,
- membrane_life_years,
- final_reduction_rate,
- current_year,
- previous_year=None):
- """
- 根据去年的已折旧周期计算今年的周期(假设每年按线性增量缩短,最终累计为 final_reduction_rate)
- 参数:
- last_year_days: 去年观测到的周期天数(不是原始第1年的值)
- membrane_life_years: 膜总寿命(年)
- final_reduction_rate: 最后一年相对于原始的累计缩短率(例如 0.4)
- current_year: 膜的当前使用年份(从1开始)
- previous_year: 已有周期对应的膜使用年份索引(若为 None,则默认 previous_year = current_year - 1)
- 返回:
- 折旧后的周期天数(int)
- """
- if previous_year is None:
- previous_year = current_year - 1
- if previous_year < 1 or current_year < 1:
- raise ValueError("年份索引必须 >= 1")
- if membrane_life_years <= 1:
- raise ValueError("membrane_life_years 必须大于 1")
- # 每年的缩短比例(线性分摊到 membrane_life_years-1 年)
- annual_reduction = final_reduction_rate / (membrane_life_years - 1)
- # 去年、今年相对于原始的累计缩短率
- r_prev = annual_reduction * (previous_year - 1)
- r_curr = annual_reduction * (current_year - 1)
- # 安全检查,避免除以 0 或负值
- if (1 - r_prev) <= 0:
- raise ValueError("去年累计缩短率使得 (1 - r_prev) <= 0,无法反推原始值。检查参数。")
- if r_curr >= 1:
- raise ValueError("今年累计缩短率 >=100%,结果不合理。检查参数。")
- # 公式:D_current = D_prev * (1 - r_curr) / (1 - r_prev)
- current_days = last_year_days * (1 - r_curr) / (1 - r_prev)
- return int(round(current_days))
- def main(unit_id, reference_date=None):
- membrane_life_years = 5
- final_reduction_rate = 0.4
- current_year = 4 # 当前是第几年使用
- # 使用传入的参考日期,如果没有则使用当前时间
- today = reference_date if reference_date is not None else datetime.today()
- # 读取 CIP 周期数据
- cip_df = read_cip_cycles("CIP_cycles_with_days.csv")
- # 匹配去年周期天数(带 >=45 天的筛选逻辑)
- last_year_days = find_last_year_cycle(cip_df, unit_id, today, min_days=45)
- print(f"去年对应周期天数: {last_year_days}")
- # 计算折旧后的周期天数(基于“去年值 → 今年值”的公式)
- current_days = depreciate_from_previous_year(
- last_year_days=last_year_days,
- membrane_life_years=membrane_life_years,
- final_reduction_rate=final_reduction_rate,
- current_year=current_year
- )
- return current_days
- # ===== 主程序 =====
- if __name__ == "__main__":
- # 用户输入
- unit_id = int(input("请输入机组编号: "))
- # 计算折旧后的周期天数(基于“去年值 → 今年值”的公式)
- current_days = main(unit_id)
- print(f"膜折旧后的周期天数: {current_days}")
|