| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504 |
- # -*- coding: utf-8 -*-
- """
- rl_tracing.py: 强化学习链路级异常溯源
- 基于 PPO (Proximal Policy Optimization) 的 Actor-Critic 架构。
- 结合了专家经验的“行为克隆 (Imitation Learning)”与“自主探索 (Reinforcement Learning)”,
- 实现从“诱发变量”逆流而上寻找“根因变量”的智能寻路。
- """
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import torch.nn.functional as F
- from torch.distributions import Categorical
- import numpy as np
- import pandas as pd
- import os
- from tqdm import tqdm
- from config import config
- # ----------------- 1. 环境 -----------------
- class CausalTracingEnv:
- """
- 强化学习交互环境。
- 定义了 AI 智能体的状态(State)、动作空间(Action Space)以及奖励机制(Reward Function)。
- """
- def __init__(self, causal_graph, window_scores, threshold_df, expert_knowledge=None):
- self.sensor_list = causal_graph['sensor_list']
- self.map = causal_graph['sensor_to_idx']
- self.idx_to_sensor = {v: k for k, v in self.map.items()}
- self.adj = causal_graph['adj_matrix']
- self.scores = window_scores
-
- # 专家历史异常链路知识库
- self.expert_knowledge = expert_knowledge if expert_knowledge else {}
- self.num_sensors = len(self.sensor_list)
-
- # 解析每个传感器的层级 (Layer) 和归属设备 (Device) 属性,用于限制非法动作
- self.node_props = {}
- col_one_layer = self._find_col(threshold_df, config.KEYWORD_LAYER)
- col_device = self._find_col(threshold_df, config.KEYWORD_DEVICE)
-
- df_indexed = threshold_df.set_index('ID')
- dict_one = df_indexed[col_one_layer].to_dict() if col_one_layer else {}
- dict_dev = df_indexed[col_device].to_dict() if col_device else {}
-
- for name, idx in self.map.items():
- l_val = dict_one.get(name, -1)
- try: l_val = int(l_val)
- except: l_val = 0
- d_val = dict_dev.get(name, None)
- d_val = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None
- self.node_props[idx] = {'one_layer': l_val, 'device': d_val}
- # 初始化回合状态变量
- self.current_window_idx = 0
- self.current_node_idx = 0
- self.prev_node_idx = 0
- self.trigger_node_idx = 0
- self.path = []
- self.current_expert_paths = []
- self.target_roots = set()
-
- def _find_col(self, df, keyword):
- if keyword in df.columns: return keyword
- for c in df.columns:
- if c.lower() == keyword.lower(): return c
- return None
- def reset(self, force_window_idx=None, force_trigger=None):
- """
- 重置环境,开启新的一轮寻路 (Episode)。
- 随机选取一个发生异常的时间窗口和触发报警的传感器作为起点。
- """
- if force_window_idx is not None:
- self.current_window_idx = force_window_idx
- t_name = force_trigger
- else:
- found = False
- # 尝试随机寻找一个存在触发变量异常的时间窗口
- for _ in range(100):
- w_idx = np.random.randint(len(self.scores))
- win_scores = self.scores[w_idx]
- candidates = []
- for t_name in config.TRIGGER_SENSORS:
- if t_name in self.map:
- idx = self.map[t_name]
- # 只有当诱发变量得分超过触发阈值,才将其作为候选起点
- if win_scores[idx] > config.TRIGGER_SCORE_THRESH:
- candidates.append(t_name)
- if candidates:
- self.current_window_idx = w_idx
- t_name = np.random.choice(candidates)
- found = True
- break
- if not found:
- self.current_window_idx = np.random.randint(len(self.scores))
- t_name = list(self.map.keys())[0]
-
- # 初始化路径状态
- self.current_node_idx = self.map.get(t_name, 0)
- self.trigger_node_idx = self.current_node_idx
- self.prev_node_idx = self.current_node_idx
- self.path = [self.current_node_idx]
-
- # 加载对应的专家知识作为本回合的目标(用于计算奖励)
- self.target_roots = set()
- self.current_expert_paths = []
- if self.current_node_idx in self.expert_knowledge:
- entry = self.expert_knowledge[self.current_node_idx]
- self.target_roots = entry['roots']
- self.current_expert_paths = entry['paths']
-
- return self._get_state()
-
- def _get_state(self):
- """
- 获取当前状态观测值 (Observation)。
- 将离散的 ID 信息与连续的异常分数/层级信息打包,供神经网络提取特征。
- """
- curr_score = self.scores[self.current_window_idx, self.current_node_idx]
- prev_score = self.scores[self.current_window_idx, self.prev_node_idx]
- curr_layer = self.node_props[self.current_node_idx]['one_layer'] / 20.0
-
- return (
- torch.LongTensor([self.current_node_idx, self.prev_node_idx, self.trigger_node_idx]),
- torch.FloatTensor([curr_score, prev_score, curr_layer])
- )
-
- def get_valid_actions(self, curr_idx):
- """
- 动作掩码 (Action Masking) 机制。
- 根据因果图和业务规则,告诉 AI 当前这一步可以走向哪些邻居节点。
- """
- # 从邻接矩阵获取物理相邻的节点
- neighbors = np.where(self.adj[curr_idx] == 1)[0]
- curr_props = self.node_props[curr_idx]
- curr_l, curr_d = curr_props['one_layer'], curr_props['device']
- valid = []
- for n in neighbors:
- if n in self.path: continue
- tgt_props = self.node_props[n]
- tgt_l, tgt_d = tgt_props['one_layer'], tgt_props['device']
-
- # 双重保险:再次校验层级和设备约束
- if curr_l != 0 and tgt_l != 0:
- if not ((tgt_l == curr_l) or (tgt_l == curr_l - 1)): continue
- if (curr_d is not None) and (tgt_d is not None):
- if curr_d != tgt_d: continue
- valid.append(n)
- return np.array(valid)
-
- def step(self, action_idx):
- """
- AI 执行一步动作,环境返回新的状态和获得的奖励 (Reward)。
- 奖励函数 (Reward Function) 是整个 AI 的价值观,决定了它的行为倾向。
- """
- prev = self.current_node_idx
- self.prev_node_idx = prev
- self.current_node_idx = action_idx
- self.path.append(action_idx)
-
- score_curr = self.scores[self.current_window_idx, self.current_node_idx]
-
- reward = 0.0
- done = False
-
- # [奖励 1:模仿专家经验]
- # 如果走到了历史记录过的异常节点上,给予正反馈
- in_expert_nodes = False
- for e_path in self.current_expert_paths:
- if action_idx in e_path:
- in_expert_nodes = True
- break
-
- if in_expert_nodes: reward += 2.0
- else: reward -= 0.2 # 探索未知节点的轻微惩罚,避免随意瞎走
-
- # [奖励 2:命中最终根因]
- # 成功找到了真正的罪魁祸首,给予巨额奖励并结束本回合
- if action_idx in self.target_roots:
- reward += 10.0
- done = True
-
- # [奖励 3:异常梯度导向]
- # 鼓励 AI 顺着异常分越来越高的方向走(异常传导衰减原理)
- score_prev = self.scores[self.current_window_idx, prev]
- diff = score_curr - score_prev
- if diff > 0: reward += diff * 3.0
- else: reward -= 0.5
-
- # [惩罚 1:路径过长] 防止发散
- if len(self.path) >= config.MAX_PATH_LENGTH:
- done = True
- if action_idx not in self.target_roots: reward -= 5.0
-
- # [惩罚 2:走入正常区域] 如果走到了异常分很低的节点,说明找错方向了
- if score_curr < 0.15 and len(self.path) > 3:
- done = True
- reward -= 2.0
-
- return self._get_state(), reward, done, {}
- # ----------------- 2. 神经网络架构 (Actor-Critic) -----------------
- class TargetDrivenActorCritic(nn.Module):
- """
- 智能体的“大脑”,采用 Actor-Critic 双头输出架构。
- Actor 负责决定“下一步去哪”(策略),Critic 负责评估“当前局势有多好”(价值)。
- """
- def __init__(self, num_sensors, embedding_dim=64, hidden_dim=256):
- super().__init__()
- self.node_emb = nn.Embedding(num_sensors, embedding_dim)
- input_dim = (embedding_dim * 3) + 3
-
- self.shared_net = nn.Sequential(
- nn.Linear(input_dim, hidden_dim),
- nn.ReLU(),
- nn.LayerNorm(hidden_dim),
- nn.Linear(hidden_dim, hidden_dim),
- nn.ReLU()
- )
- self.actor = nn.Linear(hidden_dim, num_sensors)
- self.critic = nn.Linear(hidden_dim, 1)
-
- def forward(self, int_data, float_data):
- curr_emb = self.node_emb(int_data[:, 0])
- prev_emb = self.node_emb(int_data[:, 1])
- trig_emb = self.node_emb(int_data[:, 2])
- x = torch.cat([curr_emb, prev_emb, trig_emb, float_data], dim=1)
- feat = self.shared_net(x)
- return self.actor(feat), self.critic(feat)
- # ----------------- 3. 训练器 -----------------
- class RLTrainer:
- def __init__(self, causal_graph, train_scores, threshold_df):
- self.sensor_map = causal_graph['sensor_to_idx']
- self.idx_to_sensor = {v: k for k, v in self.sensor_map.items()}
- self.threshold_df = threshold_df
- self.causal_graph = causal_graph
- self.expert_knowledge, self.bc_samples, _ = self._load_expert_data()
-
- self.env = CausalTracingEnv(causal_graph, train_scores, threshold_df, self.expert_knowledge)
- self.model = TargetDrivenActorCritic(self.env.num_sensors, config.EMBEDDING_DIM, config.HIDDEN_DIM)
- self.optimizer = optim.Adam(self.model.parameters(), lr=config.PPO_LR)
-
- def _load_expert_data(self):
- path = config.ABNORMAL_LINK_FILENAME
- kb_data = {}
- bc_data = []
- if not os.path.exists(path): return kb_data, bc_data, None
-
- df = pd.read_excel(path)
- for _, row in df.iterrows():
- link = str(row.get('Link Path', ''))
- if not link: continue
- nodes_str = [n.strip() for n in link.replace('→', '->').split('->')]
- path_nodes = nodes_str[::-1]
-
- ids = []
- valid = True
- for n in path_nodes:
- if n in self.sensor_map: ids.append(self.sensor_map[n])
- else: valid = False; break
- if not valid or len(ids)<2: continue
-
- trigger_id = ids[0]
- root_id = ids[-1]
-
- if trigger_id not in kb_data:
- kb_data[trigger_id] = {'paths': [], 'roots': set(), 'logic': row.get('Process Logic Basis', '')}
- kb_data[trigger_id]['paths'].append(ids)
- kb_data[trigger_id]['roots'].add(root_id)
-
- for i in range(len(ids) - 1):
- curr = ids[i]
- prev = ids[max(0, i-1)]
- nxt = ids[i+1]
- bc_data.append(((curr, prev, trigger_id), nxt))
-
- return kb_data, bc_data, df
- def pretrain_bc(self):
- """
- 第一阶段:行为克隆 (Behavior Cloning) 预训练。
- 相当于给 AI 上课死记硬背专家已知的异常链路,让它具备基础的业务常识。
- 采用标准的监督学习交叉熵损失。
- """
- if not self.bc_samples: return
- print(f"\n>>> [Step 3.1] 启动BC预训练 ({config.BC_EPOCHS}轮)...")
- states_int = torch.LongTensor([list(s) for s, a in self.bc_samples])
- actions = torch.LongTensor([a for s, a in self.bc_samples])
- states_float = torch.zeros((len(states_int), 3))
- states_float[:, 0] = 0.9
- states_float[:, 1] = 0.8
-
- loss_fn = nn.CrossEntropyLoss()
- pbar = tqdm(range(config.BC_EPOCHS), desc="BC Training")
- for epoch in pbar:
- logits, _ = self.model(states_int, states_float)
- loss = loss_fn(logits, actions)
- self.optimizer.zero_grad()
- loss.backward()
- self.optimizer.step()
- if epoch%100==0: pbar.set_postfix({'Loss': f"{loss.item():.4f}"})
- def train_ppo(self):
- """
- 第二阶段:PPO 强化学习自主探索。
- AI 在具有不同异常分数分布的真实数据环境中不断试错,
- 发现专家库中未登记的新的潜在异常链路。
- """
- print(f"\n>>> [Step 3.2] 启动PPO训练 ({config.RL_EPISODES}轮)...")
- pbar = tqdm(range(config.RL_EPISODES), desc="PPO Training")
- rewards_hist = []
- for _ in pbar:
- state_data = self.env.reset()
- done = False
- ep_r = 0
- b_int, b_float, b_act, b_lp, b_rew, b_mask = [], [], [], [], [], []
-
- while not done:
- s_int = state_data[0].unsqueeze(0)
- s_float = state_data[1].unsqueeze(0)
- valid = self.env.get_valid_actions(s_int[0, 0].item())
- if len(valid) == 0: break
-
- logits, _ = self.model(s_int, s_float)
- mask = torch.full_like(logits, -1e9)
- mask[0, valid] = 0
- dist = Categorical(F.softmax(logits+mask, dim=-1))
- action = dist.sample()
-
- next_s, r, done, _ = self.env.step(action.item())
- b_int.append(s_int); b_float.append(s_float)
- b_act.append(action); b_lp.append(dist.log_prob(action))
- b_rew.append(r); b_mask.append(1-done)
- state_data = next_s
- ep_r += r
-
- if len(b_rew) > 1:
- self._update_ppo(b_int, b_float, b_act, b_lp, b_rew, b_mask)
-
- rewards_hist.append(ep_r)
- if len(rewards_hist)>50: rewards_hist.pop(0)
- pbar.set_postfix({'AvgR': f"{np.mean(rewards_hist):.2f}"})
- def _update_ppo(self, b_int, b_float, b_act, b_lp, b_rew, b_mask):
- """PPO 核心公式计算:折扣回报计算、优势函数、Clip 截断防止策略更新幅度过大"""
- returns = []
- R = 0
- for r, m in zip(reversed(b_rew), reversed(b_mask)):
- R = r + config.PPO_GAMMA * R * m
- returns.insert(0, R)
- returns = torch.tensor(returns)
-
- if returns.numel() > 1 and returns.std() > 1e-5:
- returns = (returns - returns.mean()) / (returns.std() + 1e-5)
- elif returns.numel() > 1:
- returns = returns - returns.mean()
-
- s_int = torch.cat(b_int)
- s_float = torch.cat(b_float)
- act = torch.stack(b_act)
- old_lp = torch.stack(b_lp).detach()
-
- for _ in range(config.PPO_K_EPOCHS):
- logits, vals = self.model(s_int, s_float)
- dist = Categorical(logits=logits)
- new_lp = dist.log_prob(act)
- ratio = torch.exp(new_lp - old_lp)
-
- surr1 = ratio * returns
- surr2 = torch.clamp(ratio, 1-config.PPO_EPS_CLIP, 1+config.PPO_EPS_CLIP) * returns
-
- v_pred = vals.squeeze()
- if v_pred.shape != returns.shape:
- v_pred = v_pred.view(-1)
- returns = returns.view(-1)
-
- loss = -torch.min(surr1, surr2).mean() + 0.5 * F.mse_loss(v_pred, returns)
- self.optimizer.zero_grad()
- loss.backward()
- self.optimizer.step()
- def evaluate(self, test_scores):
- """
- 第四步:模型验证与评估。
- 使用未见过的测试集数据让 AI 跑全流程,评估诊断准确率和新模式发现能力。
- 并将结果导出为结构化的 Excel 评估报告。
- """
- print("\n>>> [Step 4] 评估测试集...")
- self.model.eval()
- results = []
-
- cnt_detected = 0
- cnt_kb_covered = 0
- cnt_path_match = 0
- cnt_root_match = 0
- cnt_new = 0
-
- env = CausalTracingEnv(self.causal_graph, test_scores, self.threshold_df, self.expert_knowledge)
-
- for win_idx in range(len(test_scores)):
- scores = test_scores[win_idx]
- active = []
- for t_name in config.TRIGGER_SENSORS:
- if t_name in self.sensor_map:
- idx = self.sensor_map[t_name]
- if scores[idx] > config.TRIGGER_SCORE_THRESH:
- active.append((t_name, idx))
-
- for t_name, t_idx in active:
- cnt_detected += 1
- state_data = env.reset(force_window_idx=win_idx, force_trigger=t_name)
- path_idxs = [t_idx]
- done = False
-
- while not done:
- s_int = state_data[0].unsqueeze(0)
- s_float = state_data[1].unsqueeze(0)
- valid = env.get_valid_actions(path_idxs[-1])
- if len(valid) == 0: break
-
- logits, _ = self.model(s_int, s_float)
- mask = torch.full_like(logits, -1e9)
- mask[0, valid] = 0
- act = torch.argmax(logits + mask, dim=1).item()
- state_data, _, done, _ = env.step(act)
- path_idxs.append(act)
- if len(path_idxs) >= config.MAX_PATH_LENGTH: done = True
-
- path_names = [self.idx_to_sensor[i] for i in path_idxs]
- root = path_names[-1]
- root_score = scores[self.sensor_map[root]]
-
- match_status = "未定义"
- logic = ""
-
- if t_idx in self.expert_knowledge:
- cnt_kb_covered += 1
- entry = self.expert_knowledge[t_idx]
- logic = entry.get('logic', '')
-
- real_roots = [self.idx_to_sensor[r] for r in entry['roots']]
- rm = False
- for p_node in path_names:
- if p_node in real_roots:
- rm = True
- break
-
- pm = False
- path_set = set(path_idxs)
- for exp_p in entry['paths']:
- exp_set = set(exp_p)
- intersection = len(path_set.intersection(exp_set))
- union = len(path_set.union(exp_set))
- if union > 0 and (intersection / union) >= 0.6:
- pm = True
- break
-
- if pm:
- match_status = "路径吻合"
- cnt_path_match += 1
- cnt_root_match += 1
- elif rm:
- match_status = "仅根因吻合"
- cnt_root_match += 1
- else:
- match_status = "不吻合"
- else:
- match_status = "新链路"
- cnt_new += 1
-
- results.append({
- "窗口ID": win_idx,
- "诱发变量": t_name,
- "溯源路径": "->".join(path_names),
- "根因变量": root,
- "根因异常分": f"{root_score:.3f}",
- "是否知识库": "是" if t_idx in self.expert_knowledge else "否",
- "匹配情况": match_status,
- "机理描述": logic
- })
- denom = max(cnt_kb_covered, 1)
- summary = [
- {"指标": "检测到的总异常样本数", "数值": cnt_detected},
- {"指标": "知识库覆盖的样本数", "数值": cnt_kb_covered},
- {"指标": "异常链路准确率", "数值": f"{cnt_path_match/denom:.2%}"},
- {"指标": "根因准确率", "数值": f"{cnt_root_match/denom:.2%}"},
- {"指标": "新发现异常模式数", "数值": cnt_new}
- ]
-
- save_path = f"{config.RESULT_SAVE_DIR}/{config.TEST_RESULT_FILENAME}"
- with pd.ExcelWriter(save_path, engine='openpyxl') as writer:
- pd.DataFrame(summary).to_excel(writer, sheet_name='Sheet1_概览指标', index=False)
- pd.DataFrame(results).to_excel(writer, sheet_name='Sheet2_测试集详情', index=False)
-
- print("\n" + "="*50)
- print(pd.DataFrame(summary).to_string(index=False))
- print(f"\n文件已保存: {save_path}")
- print("="*50)
- def save_model(self):
- path = config.MODEL_FILE_PATH
- torch.save(self.model.state_dict(), path)
|