使用 Python 构建自主 SOC 分析师群体

发布: (2026年1月20日 GMT+8 11:09)
8 min read
原文: Dev.to

Source: Dev.to

TL;DR

我构建了一个 Autonomous SOC Swarm,其中三个专门的 AI 代理(Network、Identity、Threat Intel)协同实时分析安全日志。Coordinator 代理会汇总它们的投票,并自主阻断威胁或标记异常。本文介绍了该系统的设计、Python 实现,以及我如何为网络安全模拟 Mixture‑of‑Agents 模式。

引言

在安全运营中心(SOC)的世界里,警报疲劳是真实存在的。分析师每天要处理成千上万的事件,容易精疲力竭。我思考:

我能否构建一支像资深安全团队一样思考的 AI 代理小队?

在这次实验中,我超越了单一的“聊天机器人”方式。我设计了一个群体,每个代理都有特定的角色——一个监控防火墙,一个检查用户行为,一个查询威胁情报。通过让它们投票,我旨在减少误报并自动化那些枯燥的工作。

本文内容概述

技术演练:构建用于 SOC 自动化的 Mixture‑of‑Agents (MoA) 系统。您将看到:

  • ECD(事件‑上下文‑决策)架构
  • Python 实现的投票机制
  • 在终端中模拟的“实时”仪表板

技术栈

技术用途
Python 3.12核心逻辑
Rich美观的终端 UI
Mermaid.js可视化代理思考
Pillow生成逐帧取证动画

为什么阅读它?

如果您对 Multi‑Agent SystemsCybersecurity Automation 感兴趣,这个项目弥合了两者之间的鸿沟。它不仅是理论;它是一个可运行的模拟,您可以克隆并扩展。此外,观看日志中代理们对裁决“争论”的过程也相当酷。

让我们来设计

架构概览

系统采用枢纽‑辐射(hub‑and‑spoke)模型。Coordinator 位于中心,接收来自各专用代理的输入。

Architecture

工作流

  1. 摄取 – 日志事件到达(例如,SSH 登录)。
  2. 分析 – 三个代理并行分析该事件。
  3. 投票 – 每个代理提交一个裁决(SAFESUSPICIOUSMALICIOUS)以及置信度分数。
  4. 决策 – Coordinator 权衡投票结果并执行响应。

Flow

代理通信

以下是在出现可疑事件时的消息流:

Sequence

让我们开始烹饪

我首先定义了 Agents。它们是模块化的,这样我可以轻松地切换它们的“大脑”(简单启发式 vs. LLM)。

1. Agents

# src/agents.py
from typing import Any, Dict, List

class BaseAgent:
    def __init__(self, name: str):
        self.name = name

    def analyze(self, log: Dict[str, Any]) -> Dict[str, Any]:
        raise NotImplementedError

NetworkAgent

class NetworkAgent(BaseAgent):
    def analyze(self, log: Dict[str, Any]) -> Dict[str, Any]:
        # Simple heuristic for port‑scan detection
        if log.get("event_type") == "port_scan":
            return {
                "agent": self.name,
                "verdict": "malicious",
                "confidence": 0.95,
                "reason": f"Port scan detected from {log['source_ip']}"
            }
        return {"agent": self.name, "verdict": "safe", "confidence": 0.90}

CoordinatorAgent

class CoordinatorAgent(BaseAgent):
    def aggregate_votes(self, votes: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Mixture‑of‑Agents voting logic."""
        score = 0
        for vote in votes:
            if vote["verdict"] == "malicious":
                score += 2
            elif vote["verdict"] == "suspicious":
                score += 1

        if score >= 3:
            return {"final_verdict": "CRITICAL", "action": "BLOCK_IP"}
        elif score >= 1:
            return {"final_verdict": "WARNING", "action": "FLAG_FOR_REVIEW"}
        else:
            return {"final_verdict": "SAFE", "action": "MONITOR"}

在我看来,这个简单的评分系统往往比单一的整体提示更稳健,因为它强制达成共识。

2. 编排

下面是一个最小循环示例,它生成模拟日志数据,将其喂给 agents,并让 coordinator 做出决定。

# src/main.py
import random
import time
from agents import NetworkAgent, CoordinatorAgent

# Instantiate agents
network = NetworkAgent(name="NetworkAgent")
coordinator = CoordinatorAgent(name="Coordinator")

def mock_log():
    """Generate a random log entry."""
    events = ["ssh_login", "port_scan", "file_access"]
    event = random.choice(events)
    return {
        "event_type": event,
        "source_ip": f"10.0.{random.randint(0,255)}.{random.randint(1,254)}",
        "user": f"user{random.randint(1,5)}"
    }

def run():
    while True:
        log = mock_log()
        vote = network.analyze(log)          # In a full impl you’d call all agents
        decision = coordinator.aggregate_votes([vote])
        print(f"[{log['event_type']}] → {decision['final_verdict']}{decision['action']}")
        time.sleep(2)

if __name__ == "__main__":
    run()

运行脚本会产生一个实时更新的终端视图(在完整仓库中使用 Rich 增强),显示 agents 的投票结果以及 coordinator 的最终动作。

Wrap‑Up

  • Mixture‑of‑Agents 为您提供一种灵活、容错的方式来自动化 SOC 任务。
  • 投票机制易于扩展:可以添加更多代理、以不同方式加权投票,或接入基于 LLM 的推理。
  • 整个项目(包括更丰富的 UI、额外的代理以及 Docker 支持)都是开源的——欢迎克隆、实验并贡献!

祝玩得开心!

自主 SOC 群体演示

下面是一个使用 rich 打印 思考过程 的代码片段。

# main.py
with Live(table, refresh_per_second=4) as live:
    for incident in generator.generate_stream(count=15):
        # ...
        votes = [
            network_agent.analyze(incident),
            identity_agent.analyze(incident),
            intel_agent.analyze(incident)
        ]

        decision = coordinator.aggregate_votes(votes)
        # ... print to table ...

这使得该工具感觉像一个真实的 CLI 产品,在开发过程中提供了极佳的反馈循环。

设置

克隆仓库

git clone https://github.com/aniket-work/autonomous-soc-swarm
cd autonomous-soc-swarm

安装依赖

python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

让我们运行

运行仿真很简单:

python main.py

我观察到,随着群体处理事件,你可以清晰地看到 误报 被过滤掉。例如,登录失败 可能会被身份代理标记,但如果网络代理没有检测到其他流量,协调者可能只会将其标记为 警告,而不是完全阻止用户。

示例输出

Title

结束语

构建这个 Autonomous SOC Swarm 是一次很好的代理编排练习。通过拆分职责,我创建了一个比黑箱模型更易解释且更易调优的系统。

未来,我计划将其连接到真实的集成点,例如 AWS GuardDutySplunk

GitHub 仓库: https://github.com/aniket-work/autonomous-soc-swarm

免责声明

此处表达的观点和意见仅代表我个人,并不代表我的雇主或我所隶属的任何组织的观点、立场或意见。内容基于我的个人经验和实验,可能不完整或不准确。任何错误或误解均非故意,如有任何陈述被误解或曲解,我在此提前致歉。

Back to Blog

相关文章

阅读更多 »