# -*- coding: utf-8 -*- """causal_structure.py: 第二层 - 物理因果结构构建""" import numpy as np import pandas as pd from config import config class CausalStructureBuilder: def __init__(self, threshold_df): self.df = threshold_df self.sensor_list = self.df['ID'].tolist() self.id_to_idx = {name: i for i, name in enumerate(self.sensor_list)} self.num_sensors = len(self.sensor_list) self.col_layer = self._find_col_by_keyword(config.KEYWORD_LAYER) self.col_device = self._find_col_by_keyword(config.KEYWORD_DEVICE) def _find_col_by_keyword(self, keyword): if keyword in self.df.columns: return keyword for col in self.df.columns: if col.lower() == keyword.lower(): return col raise ValueError(f"错误: 未找到列名包含 '{keyword}' 的列") def build(self): adj_matrix = np.zeros((self.num_sensors, self.num_sensors), dtype=int) nodes_info = {} for _, row in self.df.iterrows(): d_val = row[self.col_device] dev_id = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None try: l_val = int(row[self.col_layer]) except: l_val = -1 nodes_info[row['ID']] = {'layer': l_val, 'device': dev_id} count_edges = 0 for i, src_name in enumerate(self.sensor_list): src_node = nodes_info.get(src_name) if not src_node or src_node['layer'] == -1: continue src_l, src_d = src_node['layer'], src_node['device'] for j, dst_name in enumerate(self.sensor_list): if i == j: continue dst_node = nodes_info.get(dst_name) if not dst_node or dst_node['layer'] == -1: continue dst_l, dst_d = dst_node['layer'], dst_node['device'] is_layer_valid = (dst_l == src_l) or (dst_l == src_l - 1) if not is_layer_valid: continue is_dev_valid = True if (src_d is not None) and (dst_d is not None): if src_d != dst_d: is_dev_valid = False if is_dev_valid: adj_matrix[i, j] = 1 count_edges += 1 return {"sensor_list": self.sensor_list, "sensor_to_idx": self.id_to_idx, "adj_matrix": adj_matrix}