find_cip.py 15 KB


  1. import pandas as pd
  2. import os
  3. from datetime import datetime, timedelta
  4. import numpy as np
  5. def read_control_word_data(unit_id):
  6. """读取控制字数据"""
  7. file_path = f'datasets/RO_data_logs/high_freq_cip/C_M_RO{unit_id}_DB_control_word_raw.csv'
  8. df = pd.read_csv(file_path)
  9. df['time'] = pd.to_datetime(df['time'])
  10. df = df.sort_values('time')
  11. df['date'] = df['time'].dt.date
  12. return df
  13. def read_pressure_data():
  14. """读取进水压力数据"""
  15. file_path = 'datasets/RO_data_logs/high_freq_cip/cip_sensors_1min_merged.csv'
  16. df = pd.read_csv(file_path)
  17. df['time'] = pd.to_datetime(df['time'])
  18. df = df.sort_values('time')
  19. df['date'] = df['time'].dt.date
  20. return df
  21. def find_control_word_events(df, unit_id):
  22. """查找持续时间超过 3h且控制字不为24.0的控制字异常事件"""
  23. result_days = []
  24. control_events_by_date = {}
  25. # 按日期分组
  26. dates = sorted(df['date'].unique())
  27. # 存储前一天的最后控制字
  28. prev_day_last_control_word = None
  29. for i, current_date in enumerate(dates):
  30. # 获取当前日期的数据
  31. day_data = df[df['date'] == current_date].copy()
  32. day_data = day_data.sort_values('time').reset_index(drop=True)
  33. day_events = []
  34. # 处理当前日期的第一个事件段(00:00:00 - 第一个记录时间)
  35. first_record_time = day_data.iloc[0]['time']
  36. first_record_control_word = day_data.iloc[0][f'C.M.RO{unit_id}_DB@control_word']
  37. # 确定第一个事件段的控制字
  38. if i == 0: # 第一天,没有前一天
  39. first_segment_control_word = 0 # 按照指导,第一天初始为停机状态
  40. else:
  41. first_segment_control_word = prev_day_last_control_word
  42. # 计算第一个事件段的持续时间(00:00:00 到 第一个记录时间)
  43. day_start = datetime.combine(current_date, datetime.min.time())
  44. first_segment_duration = (first_record_time - day_start).total_seconds() / 60
  45. # 如果第一个事件段持续时间超过 3h且控制字不为24.0,则记录
  46. if (first_segment_duration > 180 and
  47. first_segment_control_word != 24.0 and
  48. first_segment_control_word is not None):
  49. day_events.append({
  50. 'start_time': day_start,
  51. 'end_time': first_record_time,
  52. 'duration_minutes': first_segment_duration,
  53. 'control_word': first_segment_control_word,
  54. 'is_initial_segment': True
  55. })
  56. # 处理当前日期内的其他事件段
  57. current_control_word = first_record_control_word
  58. current_start_time = first_record_time
  59. for j in range(1, len(day_data)):
  60. current_time = day_data.iloc[j]['time']
  61. current_control = day_data.iloc[j][f'C.M.RO{unit_id}_DB@control_word']
  62. if current_control != current_control_word:
  63. # 状态变化,记录前一个事件段
  64. duration = (current_time - current_start_time).total_seconds() / 60
  65. if duration > 180 and current_control_word != 24.0:
  66. day_events.append({
  67. 'start_time': current_start_time,
  68. 'end_time': current_time,
  69. 'duration_minutes': duration,
  70. 'control_word': current_control_word,
  71. 'is_initial_segment': False
  72. })
  73. current_control_word = current_control
  74. current_start_time = current_time
  75. # 处理最后一个事件段(最后一个记录时间到23:59:59)
  76. last_record_time = day_data.iloc[-1]['time']
  77. last_control_word = day_data.iloc[-1][f'C.M.RO{unit_id}_DB@control_word']
  78. day_end = datetime.combine(current_date, datetime.max.time()).replace(hour=23, minute=59, second=59)
  79. last_segment_duration = (day_end - last_record_time).total_seconds() / 60
  80. if (last_segment_duration > 180 and
  81. last_control_word != 24.0):
  82. day_events.append({
  83. 'start_time': last_record_time,
  84. 'end_time': day_end,
  85. 'duration_minutes': last_segment_duration,
  86. 'control_word': last_control_word,
  87. 'is_initial_segment': False
  88. })
  89. # 保存前一天的最后控制字
  90. prev_day_last_control_word = last_control_word
  91. # 如果有任何异常事件,添加到结果中
  92. if day_events:
  93. result_days.append(current_date)
  94. control_events_by_date[current_date] = day_events
  95. return result_days, control_events_by_date
  96. def find_pressure_events(pressure_df, target_dates, unit_id):
  97. """查找进水压力异常事件"""
  98. pressure_events = {}
  99. for date in target_dates:
  100. # 筛选指定日期的数据
  101. day_data = pressure_df[pressure_df['date'] == date].copy()
  102. if len(day_data) == 0:
  103. continue
  104. day_data = day_data.sort_values('time')
  105. day_data['pressure_valid'] = day_data[f'C.M.RO{unit_id}_PT_JS@out'].between(0.05, 0.5)
  106. events = []
  107. current_streak = 0
  108. gap_count = 0
  109. event_start = None
  110. event_end = None
  111. for i, row in day_data.iterrows():
  112. if row['pressure_valid']:
  113. if current_streak == 0:
  114. event_start = row['time']
  115. current_streak += 1
  116. gap_count = 0 # 重置间断计数
  117. event_end = row['time']
  118. else:
  119. if current_streak > 0:
  120. gap_count += 1
  121. if gap_count > 5: # 超过5分钟间断,结束当前事件
  122. duration = (event_end - event_start).total_seconds() / 60 + 1 # 加上最后1分钟
  123. if duration >= 20:
  124. events.append({
  125. 'start_time': event_start,
  126. 'end_time': event_end,
  127. 'duration_minutes': duration,
  128. 'avg_pressure': day_data.loc[
  129. (day_data['time'] >= event_start) &
  130. (day_data['time'] <= event_end) &
  131. day_data['pressure_valid']
  132. ][f'C.M.RO{unit_id}_PT_JS@out'].mean()
  133. })
  134. current_streak = 0
  135. gap_count = 0
  136. # 检查最后一段
  137. if current_streak > 0:
  138. duration = (event_end - event_start).total_seconds() / 60 + 1
  139. if duration >= 20:
  140. events.append({
  141. 'start_time': event_start,
  142. 'end_time': event_end,
  143. 'duration_minutes': duration,
  144. 'avg_pressure': day_data.loc[
  145. (day_data['time'] >= event_start) &
  146. (day_data['time'] <= event_end) &
  147. day_data['pressure_valid']
  148. ][f'C.M.RO{unit_id}_PT_JS@out'].mean()
  149. })
  150. if events:
  151. pressure_events[date] = events
  152. return pressure_events
  153. def match_events(control_events, pressure_events):
  154. """匹配控制字异常和进水压力异常的时间段"""
  155. matched_results = []
  156. for control_event in control_events:
  157. for pressure_event in pressure_events:
  158. # 计算时间段的交集
  159. overlap_start = max(control_event['start_time'], pressure_event['start_time'])
  160. overlap_end = min(control_event['end_time'], pressure_event['end_time'])
  161. # 如果有交集且交集时间大于0
  162. if overlap_start < overlap_end:
  163. overlap_duration = (overlap_end - overlap_start).total_seconds() / 60
  164. if overlap_duration > 0:
  165. matched_results.append({
  166. 'control_start': control_event['start_time'],
  167. 'control_end': control_event['end_time'],
  168. 'pressure_start': pressure_event['start_time'],
  169. 'pressure_end': pressure_event['end_time'],
  170. 'overlap_start': overlap_start,
  171. 'overlap_end': overlap_end,
  172. 'overlap_duration_minutes': overlap_duration,
  173. 'control_word': control_event['control_word'],
  174. 'avg_pressure': pressure_event['avg_pressure']
  175. })
  176. return matched_results
  177. def calculate_total_matched_time(matched_events):
  178. """计算匹配的总时长,避免重复计算重叠时间"""
  179. if not matched_events:
  180. return 0, []
  181. # 合并重叠的时间段
  182. time_intervals = []
  183. for event in matched_events:
  184. time_intervals.append((event['overlap_start'], event['overlap_end']))
  185. # 按开始时间排序
  186. time_intervals.sort(key=lambda x: x[0])
  187. # 合并重叠区间
  188. merged_intervals = []
  189. current_start, current_end = time_intervals[0]
  190. for start, end in time_intervals[1:]:
  191. if start <= current_end:
  192. current_end = max(current_end, end)
  193. else:
  194. merged_intervals.append((current_start, current_end))
  195. current_start, current_end = start, end
  196. merged_intervals.append((current_start, current_end))
  197. # 计算总时长
  198. total_duration = 0
  199. for start, end in merged_intervals:
  200. total_duration += (end - start).total_seconds() / 60
  201. return total_duration, merged_intervals
  202. def analyze_both_exist_but_no_match(control_events, pressure_events, date):
  203. """分析同时存在两种异常但时间段不重叠的原因"""
  204. reasons = []
  205. reasons.append("控制字异常和进水压力异常同时存在,但时间段没有重叠")
  206. # 详细列出控制字异常事件
  207. reasons.append("控制字异常事件详情:")
  208. for i, event in enumerate(control_events, 1):
  209. segment_type = " (初始段)" if event.get('is_initial_segment', False) else ""
  210. reasons.append(
  211. f" 事件{i}: {event['start_time']} - {event['end_time']} (时长: {event['duration_minutes']:.1f}分钟, 控制字: {event['control_word']}{segment_type})")
  212. # 详细列出进水压力异常事件
  213. reasons.append("进水压力异常事件详情:")
  214. for i, event in enumerate(pressure_events, 1):
  215. reasons.append(
  216. f" 事件{i}: {event['start_time']} - {event['end_time']} (时长: {event['duration_minutes']:.1f}分钟, 平均压力: {event['avg_pressure']:.3f}MPa)")
  217. # 分析时间范围
  218. control_start_min = min(event['start_time'] for event in control_events)
  219. control_end_max = max(event['end_time'] for event in control_events)
  220. pressure_start_min = min(event['start_time'] for event in pressure_events)
  221. pressure_end_max = max(event['end_time'] for event in pressure_events)
  222. reasons.append(f"控制字异常总体时间范围: {control_start_min.time()} - {control_end_max.time()}")
  223. reasons.append(f"进水压力异常总体时间范围: {pressure_start_min.time()} - {pressure_end_max.time()}")
  224. # 检查是否有时间重叠的可能性
  225. if control_end_max < pressure_start_min:
  226. reasons.append("时间关系: 所有控制字异常都在进水压力异常之前结束")
  227. elif pressure_end_max < control_start_min:
  228. reasons.append("时间关系: 所有进水压力异常都在控制字异常之前结束")
  229. else:
  230. reasons.append("时间关系: 异常事件在时间上有交错,但没有具体时间段重叠")
  231. return reasons
  232. def mark_cip_events(events, unit_id):
  233. """标记CIP事件的开始和结束"""
  234. # 整理事件数据
  235. event_data = []
  236. for date, event_list in events.items():
  237. for event in event_list:
  238. event_data.append({
  239. 'date': pd.to_datetime(date),
  240. 'duration_minutes': event['duration_minutes'],
  241. })
  242. df = pd.DataFrame(event_data)
  243. if df.empty:
  244. return pd.DataFrame(columns=["start_date", "end_date", "unit_id"])
  245. df = df.groupby("date", as_index=False)['duration_minutes'].sum().sort_values("date")
  246. cip_records = []
  247. i = 0
  248. while i < len(df):
  249. current_date = df.loc[i, 'date']
  250. # 计算从当前日开始的后 10 天累计时长
  251. end_window = current_date + timedelta(days=10)
  252. window_df = df[(df['date'] >= current_date) & (df['date'] <= end_window)]
  253. total_duration = window_df['duration_minutes'].sum()
  254. if total_duration > 100:
  255. # CIP 开始日
  256. start_date = current_date
  257. # CIP 结束日 = 窗口中最后一天有事件的日期
  258. end_date = window_df['date'].max()
  259. cip_records.append({
  260. "start_date": start_date,
  261. "end_date": end_date,
  262. "unit_id": unit_id
  263. })
  264. # 跳到结束日之后,避免重复计入
  265. i = df.index[df['date'] > end_date].min() if (df['date'] > end_date).any() else len(df)
  266. else:
  267. i += 1
  268. return pd.DataFrame(cip_records)
  269. def analyze_unit(unit_id):
  270. """
  271. 分析单个机组:
  272. - 查找控制字异常 & 压力异常
  273. - 匹配异常事件
  274. - 对匹配成功的事件进行 CIP 筛选
  275. - 返回 CIP 事件 DataFrame
  276. """
  277. print(f"\n正在处理 {unit_id} 号机组...")
  278. # 读取控制字数据
  279. control_df = read_control_word_data(unit_id)
  280. target_dates, control_events_by_date = find_control_word_events(control_df, unit_id)
  281. # 读取压力数据
  282. pressure_df = read_pressure_data()
  283. pressure_events_by_date = find_pressure_events(pressure_df, target_dates, unit_id)
  284. # 找出两种异常都存在的日期
  285. both_exist_dates = [d for d in target_dates if d in pressure_events_by_date]
  286. # 匹配结果收集
  287. matched_control_events = {}
  288. matched_pressure_events = {}
  289. for date in both_exist_dates:
  290. control_events = control_events_by_date.get(date, [])
  291. pressure_events = pressure_events_by_date.get(date, [])
  292. matched_events = match_events(control_events, pressure_events)
  293. if matched_events: # 只保留匹配成功的
  294. matched_control_events[date] = control_events
  295. matched_pressure_events[date] = pressure_events
  296. # 只对匹配成功的日期做 CIP 判断
  297. combined_events = {**matched_control_events, **matched_pressure_events}
  298. cip_df = mark_cip_events(combined_events, unit_id)
  299. return cip_df
  300. def main():
  301. """
  302. - 依次处理 1–4 号机组
  303. - 合并所有机组的 CIP 事件
  304. - 保存到一个 CSV 文件
  305. """
  306. all_cip_dfs = []
  307. for unit_id in range(1, 5):
  308. cip_df = analyze_unit(unit_id)
  309. all_cip_dfs.append(cip_df)
  310. # 合并结果
  311. final_cip_df = pd.concat(all_cip_dfs, ignore_index=True).sort_values(['unit_id', 'start_date'])
  312. # 保存到 CSV
  313. save_path = "CIP_events_all_units.csv"
  314. final_cip_df.to_csv(save_path, index=False)
  315. print(f"\n所有机组的 CIP 清洗事件区间已保存到 {save_path}")
  316. if __name__ == "__main__":
  317. main()