我如何为云流量构建自适应的‘免疫系统’
Source: Dev.to

架构概览:内部工作原理
这不仅仅是一个在真空中运行的脚本。为了让它在生产环境中工作,我部署了一个镜像真实 DevOps 架构的堆栈:
The Source: A Nextcloud instance running in Docker.
The Proxy: Nginx, configured to write JSON access logs to a specific path.
The Bridge: A named Docker volume (HNG-nginx-logs) shared between Nginx (writer) and my Python daemon (reader).
The Brain: A multi‑module Python engine that tails these logs in real‑time.
1. 滑动窗口:超越简单计数器
大多数初学者使用每分钟重置的整数计数器。这是错误的:如果攻击者在一分钟的最后 10 秒内发送 1,000 次请求,计数器可能会错过“峰值”。
我使用了基于时间的滑动窗口,配合 collections.deque:
from collections import deque
import time
# Each IP has its own deque of timestamps
window = deque()
def process_request():
now = time.time()
window.append(now) # Add the new hit
# Eviction logic: keep only the last 60 seconds
while window and window[0] < now - 60:
window.popleft()
检测标准(先满足的即标记为异常):
- Z‑score > 3.0 → 流量偏离平均值 3 个标准差。
- 5× 规则 → 当前速率超过基线均值的 5 倍。
4. “零信任”错误激增
攻击者常留下 404 Not Found(扫描)或 500 Internal Server Error(崩溃)的痕迹。
引擎会跟踪每个 IP 的错误率。如果某 IP 的 4xx/5xx 错误超过基线错误率的 3 倍,检测阈值会从 Z‑score 3.0 降至 1.5,去除“疑似”优势。
5. 执行与封禁生命周期
没有行动的检测毫无意义。当触发封禁时,引擎直接与 Linux 内核交互:
# Inject a DROP rule into iptables
iptables -I INPUT -s <IP_ADDRESS> -j DROP
- 退避计划:封禁按阶段递进 — 10 分钟 → 30 分钟 → 2 小时 → 永久。
- 告警:在 10 秒内发送 Slack 通知,包含 Z‑score、当前速率和基线信息。
6. 实时可观测性
实时指标 UI 充当控制室。使用 Flask 构建并每 3 秒刷新,提供完整可视化:
- 全局请求/秒 与 学习得到的有效均值/标准差。
- 带有“剩余时间”倒计时的被封禁 IP。
- 前 10 名来源 IP 与系统健康状态(CPU/内存)。
经验教训
最大的收获:DevOps 关注的是观察,而不仅仅是维护。最难的部分不是架构,而是数学。我花了大量时间微调阈值,以便系统能够区分成功的产品发布和真实的 DDoS 攻击。
真实场景的怪癖:在测试期间,我意外封禁了自己的 Docker 网关 (172.18.0.1)。Nginx 通过 Docker 桥接看到内部流量,引擎将网关标记为激进攻击者并将其锁定。这迫使我为内部 CIDR 范围制定更稳健的白名单策略,证明即使是最好的数学也必须基于你特定网络的现实。