Python을 사용한 자율 SOC 분석가 스웜 구축
Source: Dev.to
죄송합니다만, 현재 저는 외부 웹사이트의 내용을 직접 가져올 수 없습니다. 번역이 필요한 텍스트를 직접 제공해 주시면, 해당 부분을 한국어로 번역해 드리겠습니다.
TL;DR
저는 세 개의 특화된 AI 에이전트(네트워크, 아이덴티티, 위협 인텔) 가 실시간으로 보안 로그를 분석하도록 협업하는 Autonomous SOC Swarm을 구축했습니다. Coordinator 에이전트가 그들의 투표를 집계하고 자동으로 위협을 차단하거나 이상 징후를 표시합니다. 이 기사에서는 설계, Python 구현, 그리고 사이버 보안을 위한 Mixture‑of‑Agents 패턴을 시뮬레이션한 방법을 다룹니다.
Introduction
보안 운영 센터(SOC)에서는 알림 피로도가 실제 문제입니다. 분석가들은 매일 수천 건의 이벤트를 triage(분류)하면서 번아웃에 빠집니다. 저는 이렇게 생각했습니다:
경험 많은 보안 팀처럼 사고하는 AI 에이전트 팀을 만들 수 있을까?
이번 실험에서는 단일 “챗봇” 접근 방식을 넘어섰습니다. 각 에이전트가 특정 역할을 맡는 스웜을 설계했는데, 하나는 방화벽을 감시하고, 하나는 사용자 행동을 확인하며, 또 다른 하나는 위협 인텔리전스를 참고합니다. 이들에게 vote(투표)를 시키면서 오탐을 줄이고 지루한 작업을 자동화하는 것을 목표로 했습니다.
이 문서가 다루는 내용
SOC 자동화를 위한 Mixture‑of‑Agents (MoA) 시스템 구축에 대한 기술적 walkthrough. 다음을 확인할 수 있습니다:
- ECD (Event‑Context‑Decision) Architecture
- 투표 메커니즘의 Python 구현
- 터미널에서 시뮬레이션된 “Live” 대시보드
기술 스택
| 기술 | 목적 |
|---|---|
| Python 3.12 | 핵심 로직 |
| Rich | 아름다운 터미널 UI |
| Mermaid.js | 에이전트 사고 시각화 |
| Pillow | 프레임별 포렌식 애니메이션 생성 |
왜 읽어야 할까요?
멀티‑에이전트 시스템이나 사이버 보안 자동화에 관심이 있다면, 이 프로젝트는 그 사이의 간극을 메워줍니다. 단순한 이론이 아니라 복제하고 확장할 수 있는 실행 가능한 시뮬레이션입니다. 또한 로그에서 에이전트들이 판결을 두고 “논쟁”하는 모습을 보는 것은 꽤 멋집니다.
디자인해봅시다
아키텍처 개요
시스템은 허브‑앤‑스포크 모델을 따릅니다. Coordinator는 중앙에 위치하여 특화된 에이전트들로부터 입력을 받습니다.
워크플로우
- Ingest – 로그 이벤트가 도착합니다 (예: SSH 로그인).
- Analyze – 세 에이전트가 병렬로 분석합니다.
- Vote – 각 에이전트가 판정(
SAFE,SUSPICIOUS,MALICIOUS)과 신뢰도 점수를 제출합니다. - Decide – Coordinator가 투표를 가중치하여 응답을 실행합니다.
에이전트 통신
의심스러운 이벤트가 발생했을 때의 메시지 흐름은 다음과 같습니다:
요리를 시작해봅시다
Agents를 정의하는 것부터 시작했습니다. 이들은 모듈식이라 “두뇌”(단순 휴리스틱 vs. LLM)를 쉽게 교체할 수 있습니다.
1. 에이전트들
# src/agents.py
from typing import Any, Dict, List
class BaseAgent:
def __init__(self, name: str):
self.name = name
def analyze(self, log: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError
NetworkAgent
class NetworkAgent(BaseAgent):
def analyze(self, log: Dict[str, Any]) -> Dict[str, Any]:
# Simple heuristic for port‑scan detection
if log.get("event_type") == "port_scan":
return {
"agent": self.name,
"verdict": "malicious",
"confidence": 0.95,
"reason": f"Port scan detected from {log['source_ip']}"
}
return {"agent": self.name, "verdict": "safe", "confidence": 0.90}
CoordinatorAgent
class CoordinatorAgent(BaseAgent):
def aggregate_votes(self, votes: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Mixture‑of‑Agents voting logic."""
score = 0
for vote in votes:
if vote["verdict"] == "malicious":
score += 2
elif vote["verdict"] == "suspicious":
score += 1
if score >= 3:
return {"final_verdict": "CRITICAL", "action": "BLOCK_IP"}
elif score >= 1:
return {"final_verdict": "WARNING", "action": "FLAG_FOR_REVIEW"}
else:
return {"final_verdict": "SAFE", "action": "MONITOR"}
제 생각에, 이 간단한 점수 체계는 단일 거대한 프롬프트보다 더 견고한 경우가 많습니다. 합의를 강제하기 때문이죠.
2. 오케스트레이션
아래는 모의 로그 데이터를 생성하고, 에이전트에 전달한 뒤, 코디네이터가 결정을 내리는 최소 루프 예시입니다.
# src/main.py
import random
import time
from agents import NetworkAgent, CoordinatorAgent
# Instantiate agents
network = NetworkAgent(name="NetworkAgent")
coordinator = CoordinatorAgent(name="Coordinator")
def mock_log():
"""Generate a random log entry."""
events = ["ssh_login", "port_scan", "file_access"]
event = random.choice(events)
return {
"event_type": event,
"source_ip": f"10.0.{random.randint(0,255)}.{random.randint(1,254)}",
"user": f"user{random.randint(1,5)}"
}
def run():
while True:
log = mock_log()
vote = network.analyze(log) # In a full impl you’d call all agents
decision = coordinator.aggregate_votes([vote])
print(f"[{log['event_type']}] → {decision['final_verdict']} – {decision['action']}")
time.sleep(2)
if __name__ == "__main__":
run()
스크립트를 실행하면 (전체 레포지토리에서는 Rich로 강화된) 실시간 업데이트 터미널 화면이 나타나며, 에이전트들의 투표와 코디네이터의 최종 행동을 보여줍니다.
마무리
- Mixture‑of‑Agents는 SOC 작업을 자동화할 수 있는 유연하고 내결함성(fault‑tolerant) 방식을 제공합니다.
- 투표 메커니즘은 확장이 용이합니다: 에이전트를 더 추가하고, 투표 가중치를 다르게 설정하거나, LLM‑기반 추론을 연결할 수 있습니다.
- 전체 프로젝트(더 풍부한 UI, 추가 에이전트, Docker 지원 포함)는 오픈‑소스이며, 자유롭게 복제하고, 실험하며, 기여하실 수 있습니다!
행복한 해킹!
자율 SOC 스웜 데모
아래는 rich를 사용하여 사고 과정을 출력하는 코드 조각입니다.
# main.py
with Live(table, refresh_per_second=4) as live:
for incident in generator.generate_stream(count=15):
# ...
votes = [
network_agent.analyze(incident),
identity_agent.analyze(incident),
intel_agent.analyze(incident)
]
decision = coordinator.aggregate_votes(votes)
# ... print to table ...
이렇게 하면 도구가 실제 CLI 제품처럼 느껴져 개발 중에 훌륭한 피드백 루프를 제공합니다.
설정하기
저장소 복제
git clone https://github.com/aniket-work/autonomous-soc-swarm
cd autonomous-soc-swarm
의존성 설치
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
실행해 보기
시뮬레이션을 실행하는 것은 간단합니다:
python main.py
스웜이 이벤트를 처리하는 동안 False Positives가 필터링되는 것을 명확히 확인할 수 있었습니다. 예를 들어, Failed Login이 Identity Agent에 의해 표시될 수 있지만, Network Agent가 다른 트래픽을 감지하지 못하면 Coordinator는 사용자를 완전히 차단하기보다는 Warning으로 표시할 수 있습니다.
예시 출력
마무리 생각
이 Autonomous SOC Swarm을 구축하는 것은 에이전트 오케스트레이션을 연습하는 훌륭한 기회였습니다. 책임을 분할함으로써, 저는 블랙‑박스 모델보다 설명 가능하고 조정하기 쉬운 시스템을 만들었습니다.
앞으로는 AWS GuardDuty 또는 Splunk와 같은 실제 통합 포인트에 연결할 계획입니다.
GitHub 저장소: https://github.com/aniket-work/autonomous-soc-swarm
면책 조항
여기에 표현된 견해와 의견은 전적으로 저 개인의 것이며, 제 고용주나 제가 소속된 어떤 조직의 견해, 입장, 의견을 대변하지 않습니다. 이 내용은 저의 개인적인 경험과 실험을 바탕으로 하며, 불완전하거나 부정확할 수 있습니다. 오류나 오해는 의도된 것이 아니며, 진술이 오해되거나 잘못 전달될 경우 미리 사과드립니다.



