find_cip.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. import pandas as pd
  2. import os
  3. from datetime import datetime, timedelta
  4. import numpy as np
  5. from functools import reduce
  6. # =========================
  7. # 全局数据路径(统一入口)
  8. # =========================
  9. DATA_PATH = "../datasets/high_freq_cip"
  10. # =========================
  11. # 全局数据格式
  12. # =========================
  13. UNIT_MAP = {
  14. 1: "RO1",
  15. 2: "RO2",
  16. 3: "RO3",
  17. 4: "RO4"
  18. }
  19. # =========================
  20. # 方法1:基于控制逻辑(强规则)
  21. # =========================
  22. def read_raw_signal(file_path, col_name):
  23. """
  24. 读取单变量CSV(列名就是变量名)
  25. 输入:
  26. file_path: CSV路径
  27. col_name: 重命名后的标准变量名(model / qxb1 等)
  28. """
  29. df = pd.read_csv(file_path)
  30. df['time'] = pd.to_datetime(df['time'])
  31. # 找到除 time 之外的唯一数据列
  32. value_cols = [c for c in df.columns if c != 'time']
  33. if len(value_cols) != 1:
  34. raise ValueError(f"❌ 文件列结构异常(应只有1个数据列): {file_path} | 实际列: {value_cols}")
  35. value_col = value_cols[0]
  36. # 重命名为统一字段(非常关键)
  37. df = df[['time', value_col]].rename(columns={value_col: col_name})
  38. return df
  39. def read_method1_signals(unit_id):
  40. """
  41. 读取方法1所需的所有控制逻辑信号
  42. 包括:
  43. - model_word(运行模式)
  44. - CIP泵状态(QXB1/2)
  45. - 阀门反馈(JSF1/2)
  46. 注意:
  47. - 每个信号是独立CSV(非连续记录)
  48. - 可能存在缺失文件(用exists保护)
  49. """
  50. base_path = DATA_PATH
  51. files = {
  52. "model": f"{base_path}/C_M_RO{unit_id}_DB_model_word_raw.csv",
  53. "qxb1": f"{base_path}/C_M_CIP_QXB1_run_raw.csv",
  54. "qxb2": f"{base_path}/C_M_CIP_QXB2_run_raw.csv",
  55. "jsf1": f"{base_path}/C_M_RO{unit_id}_CIP_JSF1_open_feedback_raw.csv",
  56. "jsf2": f"{base_path}/C_M_RO{unit_id}_CIP_JSF2_open_feedback_raw.csv",
  57. }
  58. dfs = []
  59. for key, path in files.items():
  60. if os.path.exists(path):
  61. dfs.append(read_raw_signal(path, key))
  62. return dfs
  63. def merge_signals(dfs):
  64. """
  65. 多信号时间对齐 + 前向填充
  66. 原理:
  67. - outer join 合并时间轴
  68. - ffill 将“状态保持”转为连续时间序列
  69. 关键假设:
  70. 控制信号在未更新时保持不变(工业数据常见)
  71. """
  72. df_merged = reduce(
  73. lambda left, right: pd.merge(left, right, on='time', how='outer'),
  74. dfs
  75. )
  76. df_merged = df_merged.sort_values('time')
  77. df_merged = df_merged.ffill()
  78. return df_merged
  79. def get_method1_dates(unit_id):
  80. """
  81. 方法1核心:
  82. 基于控制逻辑判断某天是否发生CIP
  83. 判定条件(逐时刻):
  84. model == 0
  85. AND (QXB1==1 OR QXB2==1)
  86. AND (JSF1==1 OR JSF2==1)
  87. 日级别判定:
  88. 当天只要出现一次 → 即判定为CIP日
  89. 输出:
  90. set(date)
  91. """
  92. print(f"方法1处理 {unit_id} 号机组...")
  93. dfs = read_method1_signals(unit_id)
  94. if not dfs:
  95. return set()
  96. df = merge_signals(dfs)
  97. df['cip_flag_m1'] = (
  98. (df['model'] == 0) &
  99. ((df['qxb1'] == 1) | (df['qxb2'] == 1)) &
  100. ((df['jsf1'] == 1) | (df['jsf2'] == 1))
  101. )
  102. df['date'] = df['time'].dt.date
  103. # 任意时刻满足 → 当天为CIP
  104. daily_flag = df.groupby('date')['cip_flag_m1'].any()
  105. return set(daily_flag[daily_flag].index)
  106. # =========================
  107. # 方法2:基于运行行为(弱规则)
  108. # =========================
  109. def read_control_word_data(unit_id):
  110. """
  111. 读取控制字(高频离散)
  112. 用于检测“异常运行段”
  113. """
  114. file_path = f'{DATA_PATH}/C_M_RO{unit_id}_DB_control_word_raw.csv'
  115. df = pd.read_csv(file_path)
  116. df['time'] = pd.to_datetime(df['time'])
  117. df = df.sort_values('time')
  118. df['date'] = df['time'].dt.date
  119. return df
  120. def read_pressure_data():
  121. """
  122. 读取1分钟级压力数据
  123. 用于识别低压运行(CIP特征)
  124. """
  125. file_path = f'{DATA_PATH}/cip_sensors_1min_merged.csv'
  126. df = pd.read_csv(file_path)
  127. df['time'] = pd.to_datetime(df['time'])
  128. df = df.sort_values('time')
  129. df['date'] = df['time'].dt.date
  130. return df
  131. def find_control_word_events(df, unit_id):
  132. """
  133. 检测“控制字异常段”
  134. 判定逻辑:
  135. - 控制字 != 24(非正常运行)
  136. - 持续时间 > 3h
  137. 输出:
  138. result_days:存在异常的日期
  139. control_events_by_date:详细时间段
  140. """
  141. result_days = []
  142. control_events_by_date = {}
  143. dates = sorted(df['date'].unique())
  144. prev_day_last_control_word = None
  145. for i, current_date in enumerate(dates):
  146. day_data = df[df['date'] == current_date].copy()
  147. day_data = day_data.sort_values('time').reset_index(drop=True)
  148. day_events = []
  149. # ===== 处理跨天初始段 =====
  150. first_record_time = day_data.iloc[0]['time']
  151. first_record_control_word = day_data.iloc[0][f'C.M.RO{unit_id}_DB@control_word']
  152. if i == 0:
  153. first_segment_control_word = 0
  154. else:
  155. first_segment_control_word = prev_day_last_control_word
  156. day_start = datetime.combine(current_date, datetime.min.time())
  157. first_segment_duration = (first_record_time - day_start).total_seconds() / 60
  158. if (first_segment_duration > 180 and
  159. first_segment_control_word != 24.0 and
  160. first_segment_control_word is not None):
  161. day_events.append({
  162. 'start_time': day_start,
  163. 'end_time': first_record_time,
  164. 'duration_minutes': first_segment_duration,
  165. 'control_word': first_segment_control_word,
  166. 'is_initial_segment': True
  167. })
  168. # ===== 中间段 =====
  169. current_control_word = first_record_control_word
  170. current_start_time = first_record_time
  171. for j in range(1, len(day_data)):
  172. current_time = day_data.iloc[j]['time']
  173. current_control = day_data.iloc[j][f'C.M.RO{unit_id}_DB@control_word']
  174. if current_control != current_control_word:
  175. duration = (current_time - current_start_time).total_seconds() / 60
  176. if duration > 180 and current_control_word != 24.0:
  177. day_events.append({
  178. 'start_time': current_start_time,
  179. 'end_time': current_time,
  180. 'duration_minutes': duration,
  181. 'control_word': current_control_word,
  182. 'is_initial_segment': False
  183. })
  184. current_control_word = current_control
  185. current_start_time = current_time
  186. # ===== 结束段 =====
  187. last_record_time = day_data.iloc[-1]['time']
  188. last_control_word = day_data.iloc[-1][f'C.M.RO{unit_id}_DB@control_word']
  189. day_end = datetime.combine(current_date, datetime.max.time()).replace(hour=23, minute=59, second=59)
  190. last_segment_duration = (day_end - last_record_time).total_seconds() / 60
  191. if (last_segment_duration > 180 and last_control_word != 24.0):
  192. day_events.append({
  193. 'start_time': last_record_time,
  194. 'end_time': day_end,
  195. 'duration_minutes': last_segment_duration,
  196. 'control_word': last_control_word,
  197. 'is_initial_segment': False
  198. })
  199. prev_day_last_control_word = last_control_word
  200. if day_events:
  201. result_days.append(current_date)
  202. control_events_by_date[current_date] = day_events
  203. return result_days, control_events_by_date
  204. def find_pressure_events(pressure_df, target_dates, unit_id):
  205. """
  206. 检测低压事件(CIP特征)
  207. 判定:
  208. - 压力在 [0.05, 0.5]
  209. - 连续 >=20min
  210. - 允许5分钟断点
  211. 输出:
  212. 每天的低压事件
  213. """
  214. pressure_events = {}
  215. for date in target_dates:
  216. day_data = pressure_df[pressure_df['date'] == date].copy()
  217. if len(day_data) == 0:
  218. continue
  219. day_data = day_data.sort_values('time')
  220. day_data['pressure_valid'] = day_data[f'C.M.RO{unit_id}_PT_JS@out'].between(0.05, 0.5)
  221. events = []
  222. current_streak = 0
  223. gap_count = 0
  224. event_start = None
  225. event_end = None
  226. for i, row in day_data.iterrows():
  227. if row['pressure_valid']:
  228. if current_streak == 0:
  229. event_start = row['time']
  230. current_streak += 1
  231. gap_count = 0
  232. event_end = row['time']
  233. else:
  234. if current_streak > 0:
  235. gap_count += 1
  236. if gap_count > 5:
  237. duration = (event_end - event_start).total_seconds() / 60 + 1
  238. if duration >= 20:
  239. events.append({
  240. 'start_time': event_start,
  241. 'end_time': event_end,
  242. 'duration_minutes': duration,
  243. 'avg_pressure': day_data.loc[
  244. (day_data['time'] >= event_start) &
  245. (day_data['time'] <= event_end) &
  246. day_data['pressure_valid']
  247. ][f'C.M.RO{unit_id}_PT_JS@out'].mean()
  248. })
  249. current_streak = 0
  250. gap_count = 0
  251. # 处理最后一段
  252. if current_streak > 0:
  253. duration = (event_end - event_start).total_seconds() / 60 + 1
  254. if duration >= 20:
  255. events.append({
  256. 'start_time': event_start,
  257. 'end_time': event_end,
  258. 'duration_minutes': duration,
  259. 'avg_pressure': day_data.loc[
  260. (day_data['time'] >= event_start) &
  261. (day_data['time'] <= event_end) &
  262. day_data['pressure_valid']
  263. ][f'C.M.RO{unit_id}_PT_JS@out'].mean()
  264. })
  265. if events:
  266. pressure_events[date] = events
  267. return pressure_events
  268. def match_events(control_events, pressure_events):
  269. """
  270. 控制异常 × 低压异常 → 时间重叠
  271. 只有同时满足 → 才认为是“CIP行为”
  272. """
  273. matched_results = []
  274. for control_event in control_events:
  275. for pressure_event in pressure_events:
  276. overlap_start = max(control_event['start_time'], pressure_event['start_time'])
  277. overlap_end = min(control_event['end_time'], pressure_event['end_time'])
  278. if overlap_start < overlap_end:
  279. overlap_duration = (overlap_end - overlap_start).total_seconds() / 60
  280. if overlap_duration > 0:
  281. matched_results.append({
  282. 'overlap_start': overlap_start,
  283. 'overlap_end': overlap_end,
  284. 'overlap_duration_minutes': overlap_duration
  285. })
  286. return matched_results
  287. def get_method2_dates(unit_id):
  288. """
  289. 方法2最终输出:
  290. 返回满足“控制异常 + 压力异常重叠”的日期集合
  291. """
  292. print(f"方法2处理 {unit_id} 号机组...")
  293. control_df = read_control_word_data(unit_id)
  294. target_dates, control_events_by_date = find_control_word_events(control_df, unit_id)
  295. pressure_df = read_pressure_data()
  296. pressure_events_by_date = find_pressure_events(pressure_df, target_dates, unit_id)
  297. both_exist_dates = [d for d in target_dates if d in pressure_events_by_date]
  298. matched_dates = set()
  299. for date in both_exist_dates:
  300. control_events = control_events_by_date.get(date, [])
  301. pressure_events = pressure_events_by_date.get(date, [])
  302. matched_events = match_events(control_events, pressure_events)
  303. if matched_events:
  304. matched_dates.add(date)
  305. return matched_dates
  306. # =========================
  307. # 融合层
  308. # =========================
  309. def label_dates(method1_dates, method2_dates, unit_id):
  310. """
  311. 融合标注规则:
  312. 0 → 两种方法都检测到(高置信)
  313. 1 → 仅方法1
  314. 2 → 仅方法2
  315. """
  316. all_dates = sorted(method1_dates | method2_dates)
  317. results = []
  318. for d in all_dates:
  319. if d in method1_dates and d in method2_dates:
  320. label = 0
  321. elif d in method1_dates:
  322. label = 1
  323. else:
  324. label = 2
  325. results.append({
  326. "date": d,
  327. "label": label,
  328. "unit_id": unit_id
  329. })
  330. return pd.DataFrame(results)
  331. def analyze_unit(unit_id):
  332. """
  333. 单机组完整流程:
  334. 方法1 → 日期集合
  335. 方法2 → 日期集合
  336. 融合 → 标签
  337. """
  338. unit_name = UNIT_MAP[unit_id]
  339. print(f"\n处理 {unit_name} 号机组...")
  340. method1_dates = get_method1_dates(unit_id)
  341. method2_dates = get_method2_dates(unit_id)
  342. label_df = label_dates(method1_dates, method2_dates, unit_name)
  343. return label_df
  344. def main():
  345. all_results = []
  346. for unit_id in range(1, 5):
  347. df = analyze_unit(unit_id)
  348. all_results.append(df)
  349. final_df = pd.concat(all_results, ignore_index=True)
  350. final_df["unit_id"] = final_df["unit_id"].astype(str)
  351. final_df = final_df.sort_values(["unit_id", "date"])
  352. save_path = "../use_data/cip_day_labels_all_units.csv"
  353. final_df.to_csv(save_path, index=False)
  354. print(f"\n按天CIP标签已保存到 {save_path}")
  355. if __name__ == "__main__":
  356. main()