|
|
@@ -1,5 +1,10 @@
|
|
|
# -*- coding: utf-8 -*-
|
|
|
-"""rl_tracing.py: 强化学习链路级异常溯源"""
|
|
|
+"""
|
|
|
+rl_tracing.py: 强化学习链路级异常溯源
|
|
|
+基于 PPO (Proximal Policy Optimization) 的 Actor-Critic 架构。
|
|
|
+结合了专家经验的“行为克隆 (Imitation Learning)”与“自主探索 (Reinforcement Learning)”,
|
|
|
+实现从“诱发变量”逆流而上寻找“根因变量”的智能寻路。
|
|
|
+"""
|
|
|
import torch
|
|
|
import torch.nn as nn
|
|
|
import torch.optim as optim
|
|
|
@@ -13,6 +18,10 @@ from config import config
|
|
|
|
|
|
# ----------------- 1. 环境 -----------------
|
|
|
class CausalTracingEnv:
|
|
|
+ """
|
|
|
+ 强化学习交互环境。
|
|
|
+ 定义了 AI 智能体的状态(State)、动作空间(Action Space)以及奖励机制(Reward Function)。
|
|
|
+ """
|
|
|
def __init__(self, causal_graph, window_scores, threshold_df, expert_knowledge=None):
|
|
|
self.sensor_list = causal_graph['sensor_list']
|
|
|
self.map = causal_graph['sensor_to_idx']
|
|
|
@@ -20,10 +29,11 @@ class CausalTracingEnv:
|
|
|
self.adj = causal_graph['adj_matrix']
|
|
|
self.scores = window_scores
|
|
|
|
|
|
+ # 专家历史异常链路知识库
|
|
|
self.expert_knowledge = expert_knowledge if expert_knowledge else {}
|
|
|
self.num_sensors = len(self.sensor_list)
|
|
|
|
|
|
- # 解析属性
|
|
|
+ # 解析每个传感器的层级 (Layer) 和归属设备 (Device) 属性,用于限制非法动作
|
|
|
self.node_props = {}
|
|
|
col_one_layer = self._find_col(threshold_df, config.KEYWORD_LAYER)
|
|
|
col_device = self._find_col(threshold_df, config.KEYWORD_DEVICE)
|
|
|
@@ -40,6 +50,7 @@ class CausalTracingEnv:
|
|
|
d_val = str(d_val).strip() if pd.notna(d_val) and str(d_val).strip() != '' else None
|
|
|
self.node_props[idx] = {'one_layer': l_val, 'device': d_val}
|
|
|
|
|
|
+ # 初始化回合状态变量
|
|
|
self.current_window_idx = 0
|
|
|
self.current_node_idx = 0
|
|
|
self.prev_node_idx = 0
|
|
|
@@ -55,11 +66,16 @@ class CausalTracingEnv:
|
|
|
return None
|
|
|
|
|
|
def reset(self, force_window_idx=None, force_trigger=None):
|
|
|
+ """
|
|
|
+ 重置环境,开启新的一轮寻路 (Episode)。
|
|
|
+ 随机选取一个发生异常的时间窗口和触发报警的传感器作为起点。
|
|
|
+ """
|
|
|
if force_window_idx is not None:
|
|
|
self.current_window_idx = force_window_idx
|
|
|
t_name = force_trigger
|
|
|
else:
|
|
|
found = False
|
|
|
+ # 尝试随机寻找一个存在触发变量异常的时间窗口
|
|
|
for _ in range(100):
|
|
|
w_idx = np.random.randint(len(self.scores))
|
|
|
win_scores = self.scores[w_idx]
|
|
|
@@ -67,6 +83,7 @@ class CausalTracingEnv:
|
|
|
for t_name in config.TRIGGER_SENSORS:
|
|
|
if t_name in self.map:
|
|
|
idx = self.map[t_name]
|
|
|
+ # 只有当诱发变量得分超过触发阈值,才将其作为候选起点
|
|
|
if win_scores[idx] > config.TRIGGER_SCORE_THRESH:
|
|
|
candidates.append(t_name)
|
|
|
if candidates:
|
|
|
@@ -77,12 +94,14 @@ class CausalTracingEnv:
|
|
|
if not found:
|
|
|
self.current_window_idx = np.random.randint(len(self.scores))
|
|
|
t_name = list(self.map.keys())[0]
|
|
|
-
|
|
|
+
|
|
|
+ # 初始化路径状态
|
|
|
self.current_node_idx = self.map.get(t_name, 0)
|
|
|
self.trigger_node_idx = self.current_node_idx
|
|
|
self.prev_node_idx = self.current_node_idx
|
|
|
self.path = [self.current_node_idx]
|
|
|
|
|
|
+ # 加载对应的专家知识作为本回合的目标(用于计算奖励)
|
|
|
self.target_roots = set()
|
|
|
self.current_expert_paths = []
|
|
|
if self.current_node_idx in self.expert_knowledge:
|
|
|
@@ -93,6 +112,10 @@ class CausalTracingEnv:
|
|
|
return self._get_state()
|
|
|
|
|
|
def _get_state(self):
|
|
|
+ """
|
|
|
+ 获取当前状态观测值 (Observation)。
|
|
|
+ 将离散的 ID 信息与连续的异常分数/层级信息打包,供神经网络提取特征。
|
|
|
+ """
|
|
|
curr_score = self.scores[self.current_window_idx, self.current_node_idx]
|
|
|
prev_score = self.scores[self.current_window_idx, self.prev_node_idx]
|
|
|
curr_layer = self.node_props[self.current_node_idx]['one_layer'] / 20.0
|
|
|
@@ -103,6 +126,11 @@ class CausalTracingEnv:
|
|
|
)
|
|
|
|
|
|
def get_valid_actions(self, curr_idx):
|
|
|
+ """
|
|
|
+ 动作掩码 (Action Masking) 机制。
|
|
|
+ 根据因果图和业务规则,告诉 AI 当前这一步可以走向哪些邻居节点。
|
|
|
+ """
|
|
|
+ # 从邻接矩阵获取物理相邻的节点
|
|
|
neighbors = np.where(self.adj[curr_idx] == 1)[0]
|
|
|
curr_props = self.node_props[curr_idx]
|
|
|
curr_l, curr_d = curr_props['one_layer'], curr_props['device']
|
|
|
@@ -111,6 +139,8 @@ class CausalTracingEnv:
|
|
|
if n in self.path: continue
|
|
|
tgt_props = self.node_props[n]
|
|
|
tgt_l, tgt_d = tgt_props['one_layer'], tgt_props['device']
|
|
|
+
|
|
|
+ # 双重保险:再次校验层级和设备约束
|
|
|
if curr_l != 0 and tgt_l != 0:
|
|
|
if not ((tgt_l == curr_l) or (tgt_l == curr_l - 1)): continue
|
|
|
if (curr_d is not None) and (tgt_d is not None):
|
|
|
@@ -119,6 +149,10 @@ class CausalTracingEnv:
|
|
|
return np.array(valid)
|
|
|
|
|
|
def step(self, action_idx):
|
|
|
+ """
|
|
|
+ AI 执行一步动作,环境返回新的状态和获得的奖励 (Reward)。
|
|
|
+ 奖励函数 (Reward Function) 是整个 AI 的价值观,决定了它的行为倾向。
|
|
|
+ """
|
|
|
prev = self.current_node_idx
|
|
|
self.prev_node_idx = prev
|
|
|
self.current_node_idx = action_idx
|
|
|
@@ -129,7 +163,8 @@ class CausalTracingEnv:
|
|
|
reward = 0.0
|
|
|
done = False
|
|
|
|
|
|
- # 奖励机制 (Imitation > Root > Gradient)
|
|
|
+ # [奖励 1:模仿专家经验]
|
|
|
+ # 如果走到了历史记录过的异常节点上,给予正反馈
|
|
|
in_expert_nodes = False
|
|
|
for e_path in self.current_expert_paths:
|
|
|
if action_idx in e_path:
|
|
|
@@ -137,29 +172,39 @@ class CausalTracingEnv:
|
|
|
break
|
|
|
|
|
|
if in_expert_nodes: reward += 2.0
|
|
|
- else: reward -= 0.2
|
|
|
+ else: reward -= 0.2 # 探索未知节点的轻微惩罚,避免随意瞎走
|
|
|
|
|
|
+ # [奖励 2:命中最终根因]
|
|
|
+ # 成功找到了真正的罪魁祸首,给予巨额奖励并结束本回合
|
|
|
if action_idx in self.target_roots:
|
|
|
reward += 10.0
|
|
|
done = True
|
|
|
|
|
|
+ # [奖励 3:异常梯度导向]
|
|
|
+ # 鼓励 AI 顺着异常分越来越高的方向走(异常传导衰减原理)
|
|
|
score_prev = self.scores[self.current_window_idx, prev]
|
|
|
diff = score_curr - score_prev
|
|
|
if diff > 0: reward += diff * 3.0
|
|
|
else: reward -= 0.5
|
|
|
|
|
|
+ # [惩罚 1:路径过长] 防止发散
|
|
|
if len(self.path) >= config.MAX_PATH_LENGTH:
|
|
|
done = True
|
|
|
if action_idx not in self.target_roots: reward -= 5.0
|
|
|
|
|
|
+ # [惩罚 2:走入正常区域] 如果走到了异常分很低的节点,说明找错方向了
|
|
|
if score_curr < 0.15 and len(self.path) > 3:
|
|
|
done = True
|
|
|
reward -= 2.0
|
|
|
|
|
|
return self._get_state(), reward, done, {}
|
|
|
|
|
|
-# ----------------- 2. 网络 -----------------
|
|
|
+# ----------------- 2. 神经网络架构 (Actor-Critic) -----------------
|
|
|
class TargetDrivenActorCritic(nn.Module):
|
|
|
+ """
|
|
|
+ 智能体的“大脑”,采用 Actor-Critic 双头输出架构。
|
|
|
+ Actor 负责决定“下一步去哪”(策略),Critic 负责评估“当前局势有多好”(价值)。
|
|
|
+ """
|
|
|
def __init__(self, num_sensors, embedding_dim=64, hidden_dim=256):
|
|
|
super().__init__()
|
|
|
self.node_emb = nn.Embedding(num_sensors, embedding_dim)
|
|
|
@@ -197,7 +242,7 @@ class RLTrainer:
|
|
|
self.optimizer = optim.Adam(self.model.parameters(), lr=config.PPO_LR)
|
|
|
|
|
|
def _load_expert_data(self):
|
|
|
- path = os.path.join(config.BASE_DIR, config.ABNORMAL_LINK_FILENAME)
|
|
|
+ path = config.ABNORMAL_LINK_FILENAME
|
|
|
kb_data = {}
|
|
|
bc_data = []
|
|
|
if not os.path.exists(path): return kb_data, bc_data, None
|
|
|
@@ -233,6 +278,11 @@ class RLTrainer:
|
|
|
return kb_data, bc_data, df
|
|
|
|
|
|
def pretrain_bc(self):
|
|
|
+ """
|
|
|
+ 第一阶段:行为克隆 (Behavior Cloning) 预训练。
|
|
|
+ 相当于给 AI 上课死记硬背专家已知的异常链路,让它具备基础的业务常识。
|
|
|
+ 采用标准的监督学习交叉熵损失。
|
|
|
+ """
|
|
|
if not self.bc_samples: return
|
|
|
print(f"\n>>> [Step 3.1] 启动BC预训练 ({config.BC_EPOCHS}轮)...")
|
|
|
states_int = torch.LongTensor([list(s) for s, a in self.bc_samples])
|
|
|
@@ -252,6 +302,11 @@ class RLTrainer:
|
|
|
if epoch%100==0: pbar.set_postfix({'Loss': f"{loss.item():.4f}"})
|
|
|
|
|
|
def train_ppo(self):
|
|
|
+ """
|
|
|
+ 第二阶段:PPO 强化学习自主探索。
|
|
|
+ AI 在具有不同异常分数分布的真实数据环境中不断试错,
|
|
|
+ 发现专家库中未登记的新的潜在异常链路。
|
|
|
+ """
|
|
|
print(f"\n>>> [Step 3.2] 启动PPO训练 ({config.RL_EPISODES}轮)...")
|
|
|
pbar = tqdm(range(config.RL_EPISODES), desc="PPO Training")
|
|
|
rewards_hist = []
|
|
|
@@ -288,6 +343,7 @@ class RLTrainer:
|
|
|
pbar.set_postfix({'AvgR': f"{np.mean(rewards_hist):.2f}"})
|
|
|
|
|
|
def _update_ppo(self, b_int, b_float, b_act, b_lp, b_rew, b_mask):
|
|
|
+ """PPO 核心公式计算:折扣回报计算、优势函数、Clip 截断防止策略更新幅度过大"""
|
|
|
returns = []
|
|
|
R = 0
|
|
|
for r, m in zip(reversed(b_rew), reversed(b_mask)):
|
|
|
@@ -325,6 +381,11 @@ class RLTrainer:
|
|
|
self.optimizer.step()
|
|
|
|
|
|
def evaluate(self, test_scores):
|
|
|
+ """
|
|
|
+ 第四步:模型验证与评估。
|
|
|
+ 使用未见过的测试集数据让 AI 跑全流程,评估诊断准确率和新模式发现能力。
|
|
|
+ 并将结果导出为结构化的 Excel 评估报告。
|
|
|
+ """
|
|
|
print("\n>>> [Step 4] 评估测试集...")
|
|
|
self.model.eval()
|
|
|
results = []
|
|
|
@@ -428,7 +489,7 @@ class RLTrainer:
|
|
|
{"指标": "新发现异常模式数", "数值": cnt_new}
|
|
|
]
|
|
|
|
|
|
- save_path = os.path.join(config.RESULT_SAVE_DIR, config.TEST_RESULT_FILENAME)
|
|
|
+ save_path = f"{config.RESULT_SAVE_DIR}/{config.TEST_RESULT_FILENAME}"
|
|
|
with pd.ExcelWriter(save_path, engine='openpyxl') as writer:
|
|
|
pd.DataFrame(summary).to_excel(writer, sheet_name='Sheet1_概览指标', index=False)
|
|
|
pd.DataFrame(results).to_excel(writer, sheet_name='Sheet2_测试集详情', index=False)
|
|
|
@@ -439,5 +500,5 @@ class RLTrainer:
|
|
|
print("="*50)
|
|
|
|
|
|
def save_model(self):
|
|
|
- path = os.path.join(config.MODEL_SAVE_DIR, "ppo_tracing_model.pth")
|
|
|
+ path = config.MODEL_FILE_PATH
|
|
|
torch.save(self.model.state_dict(), path)
|