| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import numpy as np
- import pandas as pd
- # =============================
- # 事件识别和划分
- # =============================
- class UFEventClassifier:
- def __init__(self, unit_name, inlet_codes, physical_codes, chemical_codes, ctrl_col):
- self.unit = unit_name
- self.inlet_min, self.inlet_max = inlet_codes
- self.physical_min, self.physical_max = physical_codes
- self.chemical_min, self.chemical_max = chemical_codes
- self.ctrl_col = ctrl_col
- def classify(self, df):
- df = df.copy()
- df["event_type"] = "other"
- df.loc[(df[self.ctrl_col] >= self.inlet_min) & (df[self.ctrl_col] <= self.inlet_max), "event_type"] = "inlet"
- df.loc[(df[self.ctrl_col] >= self.physical_min) & (df[self.ctrl_col] <= self.physical_max), "event_type"] = "bw_phys"
- df.loc[(df[self.ctrl_col] >= self.chemical_min) & (df[self.ctrl_col] <= self.chemical_max), "event_type"] = "bw_chem"
- return df
- def segment(self, df, inlet_types=None):
- """
- 将连续的事件划分为段
- Args:
- df: 输入数据框
- inlet_types: 被视为进水的事件类型列表,默认为 ["inlet", "other"]
- """
- if inlet_types is None:
- inlet_types = ["inlet", "other"] # 默认包括inlet和other
- df = df.copy()
- df["segment_id"] = np.nan
- seg_id = 0
- in_inlet = False
- for i, evt in enumerate(df["event_type"]):
- if evt in inlet_types: # 使用可配置的事件类型列表
- if not in_inlet:
- seg_id += 1
- in_inlet = True
- df.loc[i, "segment_id"] = seg_id
- else:
- in_inlet = False
- df = df[df["segment_id"].notna()].copy()
- df["segment_id"] = df["segment_id"].astype(int)
- return df
- class PostBackwashInletMarker:
- """
- 标记反冲洗事件后的前 N 个进水点
- """
- def __init__(self, n_points=10):
- self.n_points = n_points
- self.label_col = "post_bw_inlet" # 新标记列
- def mark(self, df: pd.DataFrame) -> pd.DataFrame:
- df = df.copy()
- # 确保 event_type 清洗干净,避免 object array 卡死
- df['event_type'] = (
- df['event_type']
- .astype(str)
- .str.strip()
- .fillna('')
- )
- df[self.label_col] = False
- # 找出所有反冲洗事件索引
- bw_idx = df.index[df['event_type'].isin(['bw_phys', 'bw_chem'])]
- # 预先计算 inlet mask,避免多次 object-level 比较
- inlet_mask = (df['event_type'] == 'inlet')
- for idx in bw_idx:
- # 只看 idx 之后的 inlet
- candidate_idx = df.index[(df.index > idx) & inlet_mask]
- post_idx = candidate_idx[: self.n_points]
- df.loc[post_idx, self.label_col] = True
- return df
|