# -*- coding: utf-8 -*- """ rl_tracing.py: 强化学习链路级异常溯源 基于 PPO (Proximal Policy Optimization) 的 Actor-Critic 架构。 结合了专家经验的“行为克隆 (Imitation Learning)”与“自主探索 (Reinforcement Learning)”, 实现从“诱发变量”逆流而上寻找“根因变量”的智能寻路。 """ import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F from torch.distributions import Categorical import numpy as np import pandas as pd import os from tqdm import tqdm from config import config # ----------------- 1. 环境 ----------------- class CausalTracingEnv: """ 强化学习交互环境。 定义了 AI 智能体的状态(State)、动作空间(Action Space)以及奖励机制(Reward Function)。 """ def __init__(self, causal_graph, window_scores, threshold_df, expert_knowledge=None): self.sensor_list = causal_graph['sensor_list'] self.map = causal_graph['sensor_to_idx'] self.idx_to_sensor = {v: k for k, v in self.map.items()} self.adj = causal_graph['adj_matrix'] self.scores = window_scores # 专家历史异常链路知识库 self.expert_knowledge = expert_knowledge if expert_knowledge else {} self.num_sensors = len(self.sensor_list) # 解析每个传感器的层级 (Layer) 和归属设备 (Device) 属性,用于限制非法动作 self.node_props = {} col_one_layer = self._find_col(threshold_df, config.KEYWORD_LAYER) col_device = self._find_col(threshold_df, config.KEYWORD_DEVICE) df_indexed = threshold_df.set_index('ID') dict_one = df_indexed[col_one_layer].to_dict() if col_one_layer else {} dict_dev = df_indexed[col_device].to_dict() if col_device else {} for name, idx in self.map.items(): l_val = dict_one.get(name, -1) try: l_val = int(l_val) except: l_val = 0 d_val = dict_dev.get(name, None) d_val = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None self.node_props[idx] = {'one_layer': l_val, 'device': d_val} # 初始化回合状态变量 self.current_window_idx = 0 self.current_node_idx = 0 self.prev_node_idx = 0 self.trigger_node_idx = 0 self.path = [] self.current_expert_paths = [] self.target_roots = set() def _find_col(self, df, keyword): if keyword in df.columns: return keyword for c in df.columns: if c.lower() == keyword.lower(): return c return None def reset(self, force_window_idx=None, force_trigger=None): """ 重置环境,开启新的一轮寻路 (Episode)。 随机选取一个发生异常的时间窗口和触发报警的传感器作为起点。 """ if force_window_idx is not None: self.current_window_idx = force_window_idx t_name = force_trigger else: found = False # 尝试随机寻找一个存在触发变量异常的时间窗口 for _ in range(100): w_idx = np.random.randint(len(self.scores)) win_scores = self.scores[w_idx] candidates = [] for t_name in config.TRIGGER_SENSORS: if t_name in self.map: idx = self.map[t_name] # 只有当诱发变量得分超过触发阈值,才将其作为候选起点 if win_scores[idx] > config.TRIGGER_SCORE_THRESH: candidates.append(t_name) if candidates: self.current_window_idx = w_idx t_name = np.random.choice(candidates) found = True break if not found: self.current_window_idx = np.random.randint(len(self.scores)) t_name = list(self.map.keys())[0] # 初始化路径状态 self.current_node_idx = self.map.get(t_name, 0) self.trigger_node_idx = self.current_node_idx self.prev_node_idx = self.current_node_idx self.path = [self.current_node_idx] # 加载对应的专家知识作为本回合的目标(用于计算奖励) self.target_roots = set() self.current_expert_paths = [] if self.current_node_idx in self.expert_knowledge: entry = self.expert_knowledge[self.current_node_idx] self.target_roots = entry['roots'] self.current_expert_paths = entry['paths'] return self._get_state() def _get_state(self): """ 获取当前状态观测值 (Observation)。 将离散的 ID 信息与连续的异常分数/层级信息打包,供神经网络提取特征。 """ curr_score = self.scores[self.current_window_idx, self.current_node_idx] prev_score = self.scores[self.current_window_idx, self.prev_node_idx] curr_layer = self.node_props[self.current_node_idx]['one_layer'] / 20.0 return ( torch.LongTensor([self.current_node_idx, self.prev_node_idx, self.trigger_node_idx]), torch.FloatTensor([curr_score, prev_score, curr_layer]) ) def get_valid_actions(self, curr_idx): """ 动作掩码 (Action Masking) 机制。 根据因果图和业务规则,告诉 AI 当前这一步可以走向哪些邻居节点。 """ # 从邻接矩阵获取物理相邻的节点 neighbors = np.where(self.adj[curr_idx] == 1)[0] curr_props = self.node_props[curr_idx] curr_l, curr_d = curr_props['one_layer'], curr_props['device'] valid = [] for n in neighbors: if n in self.path: continue tgt_props = self.node_props[n] tgt_l, tgt_d = tgt_props['one_layer'], tgt_props['device'] # 双重保险:再次校验层级和设备约束 if curr_l != 0 and tgt_l != 0: if not ((tgt_l == curr_l) or (tgt_l == curr_l - 1)): continue if (curr_d is not None) and (tgt_d is not None): if curr_d != tgt_d: continue valid.append(n) return np.array(valid) def step(self, action_idx): """ AI 执行一步动作,环境返回新的状态和获得的奖励 (Reward)。 奖励函数 (Reward Function) 是整个 AI 的价值观,决定了它的行为倾向。 """ prev = self.current_node_idx self.prev_node_idx = prev self.current_node_idx = action_idx self.path.append(action_idx) score_curr = self.scores[self.current_window_idx, self.current_node_idx] reward = 0.0 done = False # [奖励 1:模仿专家经验] # 如果走到了历史记录过的异常节点上,给予正反馈 in_expert_nodes = False for e_path in self.current_expert_paths: if action_idx in e_path: in_expert_nodes = True break if in_expert_nodes: reward += 2.0 else: reward -= 0.2 # 探索未知节点的轻微惩罚,避免随意瞎走 # [奖励 2:命中最终根因] # 成功找到了真正的罪魁祸首,给予巨额奖励并结束本回合 if action_idx in self.target_roots: reward += 10.0 done = True # [奖励 3:异常梯度导向] # 鼓励 AI 顺着异常分越来越高的方向走(异常传导衰减原理) score_prev = self.scores[self.current_window_idx, prev] diff = score_curr - score_prev if diff > 0: reward += diff * 3.0 else: reward -= 0.5 # [惩罚 1:路径过长] 防止发散 if len(self.path) >= config.MAX_PATH_LENGTH: done = True if action_idx not in self.target_roots: reward -= 5.0 # [惩罚 2:走入正常区域] 如果走到了异常分很低的节点,说明找错方向了 if score_curr < 0.15 and len(self.path) > 3: done = True reward -= 2.0 return self._get_state(), reward, done, {} # ----------------- 2. 神经网络架构 (Actor-Critic) ----------------- class TargetDrivenActorCritic(nn.Module): """ 智能体的“大脑”,采用 Actor-Critic 双头输出架构。 Actor 负责决定“下一步去哪”(策略),Critic 负责评估“当前局势有多好”(价值)。 """ def __init__(self, num_sensors, embedding_dim=64, hidden_dim=256): super().__init__() self.node_emb = nn.Embedding(num_sensors, embedding_dim) input_dim = (embedding_dim * 3) + 3 self.shared_net = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.LayerNorm(hidden_dim), nn.Linear(hidden_dim, hidden_dim), nn.ReLU() ) self.actor = nn.Linear(hidden_dim, num_sensors) self.critic = nn.Linear(hidden_dim, 1) def forward(self, int_data, float_data): curr_emb = self.node_emb(int_data[:, 0]) prev_emb = self.node_emb(int_data[:, 1]) trig_emb = self.node_emb(int_data[:, 2]) x = torch.cat([curr_emb, prev_emb, trig_emb, float_data], dim=1) feat = self.shared_net(x) return self.actor(feat), self.critic(feat) # ----------------- 3. 训练器 ----------------- class RLTrainer: def __init__(self, causal_graph, train_scores, threshold_df): self.sensor_map = causal_graph['sensor_to_idx'] self.idx_to_sensor = {v: k for k, v in self.sensor_map.items()} self.threshold_df = threshold_df self.causal_graph = causal_graph self.expert_knowledge, self.bc_samples, _ = self._load_expert_data() self.env = CausalTracingEnv(causal_graph, train_scores, threshold_df, self.expert_knowledge) self.model = TargetDrivenActorCritic(self.env.num_sensors, config.EMBEDDING_DIM, config.HIDDEN_DIM) self.optimizer = optim.Adam(self.model.parameters(), lr=config.PPO_LR) def _load_expert_data(self): path = config.ABNORMAL_LINK_FILENAME kb_data = {} bc_data = [] if not os.path.exists(path): return kb_data, bc_data, None df = pd.read_excel(path) for _, row in df.iterrows(): link = str(row.get('Link Path', '')) if not link: continue nodes_str = [n.strip() for n in link.replace('→', '->').split('->')] path_nodes = nodes_str[::-1] ids = [] valid = True for n in path_nodes: if n in self.sensor_map: ids.append(self.sensor_map[n]) else: valid = False; break if not valid or len(ids)<2: continue trigger_id = ids[0] root_id = ids[-1] if trigger_id not in kb_data: kb_data[trigger_id] = {'paths': [], 'roots': set(), 'logic': row.get('Process Logic Basis', '')} kb_data[trigger_id]['paths'].append(ids) kb_data[trigger_id]['roots'].add(root_id) for i in range(len(ids) - 1): curr = ids[i] prev = ids[max(0, i-1)] nxt = ids[i+1] bc_data.append(((curr, prev, trigger_id), nxt)) return kb_data, bc_data, df def pretrain_bc(self): """ 第一阶段:行为克隆 (Behavior Cloning) 预训练。 相当于给 AI 上课死记硬背专家已知的异常链路,让它具备基础的业务常识。 采用标准的监督学习交叉熵损失。 """ if not self.bc_samples: return print(f"\n>>> [Step 3.1] 启动BC预训练 ({config.BC_EPOCHS}轮)...") states_int = torch.LongTensor([list(s) for s, a in self.bc_samples]) actions = torch.LongTensor([a for s, a in self.bc_samples]) states_float = torch.zeros((len(states_int), 3)) states_float[:, 0] = 0.9 states_float[:, 1] = 0.8 loss_fn = nn.CrossEntropyLoss() pbar = tqdm(range(config.BC_EPOCHS), desc="BC Training") for epoch in pbar: logits, _ = self.model(states_int, states_float) loss = loss_fn(logits, actions) self.optimizer.zero_grad() loss.backward() self.optimizer.step() if epoch%100==0: pbar.set_postfix({'Loss': f"{loss.item():.4f}"}) def train_ppo(self): """ 第二阶段:PPO 强化学习自主探索。 AI 在具有不同异常分数分布的真实数据环境中不断试错, 发现专家库中未登记的新的潜在异常链路。 """ print(f"\n>>> [Step 3.2] 启动PPO训练 ({config.RL_EPISODES}轮)...") pbar = tqdm(range(config.RL_EPISODES), desc="PPO Training") rewards_hist = [] for _ in pbar: state_data = self.env.reset() done = False ep_r = 0 b_int, b_float, b_act, b_lp, b_rew, b_mask = [], [], [], [], [], [] while not done: s_int = state_data[0].unsqueeze(0) s_float = state_data[1].unsqueeze(0) valid = self.env.get_valid_actions(s_int[0, 0].item()) if len(valid) == 0: break logits, _ = self.model(s_int, s_float) mask = torch.full_like(logits, -1e9) mask[0, valid] = 0 dist = Categorical(F.softmax(logits+mask, dim=-1)) action = dist.sample() next_s, r, done, _ = self.env.step(action.item()) b_int.append(s_int); b_float.append(s_float) b_act.append(action); b_lp.append(dist.log_prob(action)) b_rew.append(r); b_mask.append(1-done) state_data = next_s ep_r += r if len(b_rew) > 1: self._update_ppo(b_int, b_float, b_act, b_lp, b_rew, b_mask) rewards_hist.append(ep_r) if len(rewards_hist)>50: rewards_hist.pop(0) pbar.set_postfix({'AvgR': f"{np.mean(rewards_hist):.2f}"}) def _update_ppo(self, b_int, b_float, b_act, b_lp, b_rew, b_mask): """PPO 核心公式计算:折扣回报计算、优势函数、Clip 截断防止策略更新幅度过大""" returns = [] R = 0 for r, m in zip(reversed(b_rew), reversed(b_mask)): R = r + config.PPO_GAMMA * R * m returns.insert(0, R) returns = torch.tensor(returns) if returns.numel() > 1 and returns.std() > 1e-5: returns = (returns - returns.mean()) / (returns.std() + 1e-5) elif returns.numel() > 1: returns = returns - returns.mean() s_int = torch.cat(b_int) s_float = torch.cat(b_float) act = torch.stack(b_act) old_lp = torch.stack(b_lp).detach() for _ in range(config.PPO_K_EPOCHS): logits, vals = self.model(s_int, s_float) dist = Categorical(logits=logits) new_lp = dist.log_prob(act) ratio = torch.exp(new_lp - old_lp) surr1 = ratio * returns surr2 = torch.clamp(ratio, 1-config.PPO_EPS_CLIP, 1+config.PPO_EPS_CLIP) * returns v_pred = vals.squeeze() if v_pred.shape != returns.shape: v_pred = v_pred.view(-1) returns = returns.view(-1) loss = -torch.min(surr1, surr2).mean() + 0.5 * F.mse_loss(v_pred, returns) self.optimizer.zero_grad() loss.backward() self.optimizer.step() def evaluate(self, test_scores): """ 第四步:模型验证与评估。 使用未见过的测试集数据让 AI 跑全流程,评估诊断准确率和新模式发现能力。 并将结果导出为结构化的 Excel 评估报告。 """ print("\n>>> [Step 4] 评估测试集...") self.model.eval() results = [] cnt_detected = 0 cnt_kb_covered = 0 cnt_path_match = 0 cnt_root_match = 0 cnt_new = 0 env = CausalTracingEnv(self.causal_graph, test_scores, self.threshold_df, self.expert_knowledge) for win_idx in range(len(test_scores)): scores = test_scores[win_idx] active = [] for t_name in config.TRIGGER_SENSORS: if t_name in self.sensor_map: idx = self.sensor_map[t_name] if scores[idx] > config.TRIGGER_SCORE_THRESH: active.append((t_name, idx)) for t_name, t_idx in active: cnt_detected += 1 state_data = env.reset(force_window_idx=win_idx, force_trigger=t_name) path_idxs = [t_idx] done = False while not done: s_int = state_data[0].unsqueeze(0) s_float = state_data[1].unsqueeze(0) valid = env.get_valid_actions(path_idxs[-1]) if len(valid) == 0: break logits, _ = self.model(s_int, s_float) mask = torch.full_like(logits, -1e9) mask[0, valid] = 0 act = torch.argmax(logits + mask, dim=1).item() state_data, _, done, _ = env.step(act) path_idxs.append(act) if len(path_idxs) >= config.MAX_PATH_LENGTH: done = True path_names = [self.idx_to_sensor[i] for i in path_idxs] root = path_names[-1] root_score = scores[self.sensor_map[root]] match_status = "未定义" logic = "" if t_idx in self.expert_knowledge: cnt_kb_covered += 1 entry = self.expert_knowledge[t_idx] logic = entry.get('logic', '') real_roots = [self.idx_to_sensor[r] for r in entry['roots']] rm = False for p_node in path_names: if p_node in real_roots: rm = True break pm = False path_set = set(path_idxs) for exp_p in entry['paths']: exp_set = set(exp_p) intersection = len(path_set.intersection(exp_set)) union = len(path_set.union(exp_set)) if union > 0 and (intersection / union) >= 0.6: pm = True break if pm: match_status = "路径吻合" cnt_path_match += 1 cnt_root_match += 1 elif rm: match_status = "仅根因吻合" cnt_root_match += 1 else: match_status = "不吻合" else: match_status = "新链路" cnt_new += 1 results.append({ "窗口ID": win_idx, "诱发变量": t_name, "溯源路径": "->".join(path_names), "根因变量": root, "根因异常分": f"{root_score:.3f}", "是否知识库": "是" if t_idx in self.expert_knowledge else "否", "匹配情况": match_status, "机理描述": logic }) denom = max(cnt_kb_covered, 1) summary = [ {"指标": "检测到的总异常样本数", "数值": cnt_detected}, {"指标": "知识库覆盖的样本数", "数值": cnt_kb_covered}, {"指标": "异常链路准确率", "数值": f"{cnt_path_match/denom:.2%}"}, {"指标": "根因准确率", "数值": f"{cnt_root_match/denom:.2%}"}, {"指标": "新发现异常模式数", "数值": cnt_new} ] save_path = f"{config.RESULT_SAVE_DIR}/{config.TEST_RESULT_FILENAME}" with pd.ExcelWriter(save_path, engine='openpyxl') as writer: pd.DataFrame(summary).to_excel(writer, sheet_name='Sheet1_概览指标', index=False) pd.DataFrame(results).to_excel(writer, sheet_name='Sheet2_测试集详情', index=False) print("\n" + "="*50) print(pd.DataFrame(summary).to_string(index=False)) print(f"\n文件已保存: {save_path}") print("="*50) def save_model(self): path = config.MODEL_FILE_PATH torch.save(self.model.state_dict(), path)