causal_structure.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # -*- coding: utf-8 -*-
  2. """
  3. causal_structure.py: 第二层 - 物理因果结构构建
  4. 该模块负责将专家知识库(如工艺层级划分、设备归属)转化为图结构(邻接矩阵)。
  5. """
  6. import numpy as np
  7. import pandas as pd
  8. from config import config
  9. class CausalStructureBuilder:
  10. def __init__(self, threshold_df):
  11. self.df = threshold_df
  12. self.sensor_list = self.df['ID'].tolist()
  13. self.id_to_idx = {name: i for i, name in enumerate(self.sensor_list)}
  14. self.num_sensors = len(self.sensor_list)
  15. self.col_layer = self._find_col_by_keyword(config.KEYWORD_LAYER)
  16. self.col_device = self._find_col_by_keyword(config.KEYWORD_DEVICE)
  17. def _find_col_by_keyword(self, keyword):
  18. if keyword in self.df.columns: return keyword
  19. for col in self.df.columns:
  20. if col.lower() == keyword.lower(): return col
  21. raise ValueError(f"错误: 未找到列名包含 '{keyword}' 的列")
  22. def build(self):
  23. """
  24. 核心构建逻辑:基于规则生成传感器之间的有向连接关系(邻接矩阵)
  25. 返回值包含:传感器列表、索引映射字典、邻接矩阵(adj_matrix)
  26. """
  27. # 初始化 N x N 的全零矩阵,0 表示无连接,1 表示有连接
  28. adj_matrix = np.zeros((self.num_sensors, self.num_sensors), dtype=int)
  29. nodes_info = {}
  30. # 1. 遍历解析所有节点的属性字典
  31. for _, row in self.df.iterrows():
  32. d_val = row[self.col_device]
  33. # 清洗设备名:处理空值、NaN 等异常输入
  34. dev_id = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None
  35. # 清洗层级号:如果层级未定义或填写错误,赋予 -1 表示该节点不参与因果溯源
  36. try: l_val = int(row[self.col_layer])
  37. except: l_val = -1
  38. nodes_info[row['ID']] = {'layer': l_val, 'device': dev_id}
  39. count_edges = 0
  40. # 2. 嵌套循环对比每一对传感器,判断它们之间是否存在“因果通路”
  41. for i, src_name in enumerate(self.sensor_list):
  42. src_node = nodes_info.get(src_name)
  43. # 如果起始节点没有定义有效层级,则跳过
  44. if not src_node or src_node['layer'] == -1: continue
  45. src_l, src_d = src_node['layer'], src_node['device']
  46. for j, dst_name in enumerate(self.sensor_list):
  47. # 排除自身到自身的连接(防止图遍历时陷入死循环)
  48. if i == j: continue
  49. dst_node = nodes_info.get(dst_name)
  50. # 如果目标节点没有定义有效层级,则跳过
  51. if not dst_node or dst_node['layer'] == -1: continue
  52. dst_l, dst_d = dst_node['layer'], dst_node['device']
  53. # ==================== (A) 层级约束 (Layer Constraint) ====================
  54. # 水厂工艺是从上游传导到下游的。溯源方向是由下到上。
  55. # dst_l == src_l: 允许在同层级(例如同一环节的不同传感器)平移寻找
  56. # dst_l == src_l - 1: 允许向上一级(上游环节)寻找原因。绝不允许越级或向下游找。
  57. is_layer_valid = (dst_l == src_l) or (dst_l == src_l - 1)
  58. if not is_layer_valid: continue
  59. # ==================== (B) 设备约束 (Device Constraint) ====================
  60. # 如果两个传感器明确归属于不同的具体设备(如一个是 RO1 膜,一个是 RO2 膜),
  61. # 则判定它们之间物理隔离,不存在因果关系,切断连接。
  62. is_dev_valid = True
  63. if (src_d is not None) and (dst_d is not None):
  64. if src_d != dst_d: is_dev_valid = False
  65. # 如果同时满足 层级约束 和 设备约束,则判定为有效通路
  66. if is_dev_valid:
  67. adj_matrix[i, j] = 1
  68. count_edges += 1
  69. return {"sensor_list": self.sensor_list, "sensor_to_idx": self.id_to_idx, "adj_matrix": adj_matrix}