我如何使用 asyncio 将并发提升 40 倍 —— 运维误以为我们被 DDoS 攻击

发布: (2026年5月1日 GMT+8 09:23)
7 分钟阅读
原文: Dev.to

Source: Dev.to

上个月,产品经理在休息室拦住我,直接说:“仪表盘又超时了。我们能不能别让老板一直刷新页面?” 当时,我们的指标同步脚本需要 11 分钟 才能跑完——200 个第三方 API 串行调用,日志里全是“一直在等待响应”的信息。我懒得解释,只是回到工位,打开编辑器,心想:这东西必须改成异步。

一周后,重写的代码上线。相同的 200 个接口,稳定地在 14 秒 内完成。监控告警立刻触发——运维以为我们遭到了 DDoS 攻击。下面讲讲这次重构是怎么实现的,asyncio 在哪里大放异彩,哪里又给我带来了坑,以及如何优雅地自救。

事件循环如何“偷取”时间

asyncio 并没有什么深奥的魔法——它只是一个单线程的 事件循环。可以把它想象成一个高度专注的调度器,它只做一件事:当任务 A 发起 HTTP 请求并在网络上等待时,调度器会挂起它,立刻转向任务 B,只有在 A 的字节到达时才会切换回来。没有线程切换的开销,没有回调地狱——所有逻辑都在 async/await 中实现。

import asyncio
import aiohttp
import time

# 模拟一次 API 调用
async def fetch_api(session, url: str) -> dict:
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
        return await resp.json()

协程函数本质上只是一个蓝图,直到你创建任务并交给事件循环。一次性触发大量协程的最常用方式是 asyncio.gather —— 只需一行代码就能并发地点燃它们。总耗时不再是所有请求时间的累加,而是最慢那个请求的时长。

async def main():
    urls = [f"https://api.example.com/data/{i}" for i in range(200)]

    async with aiohttp.ClientSession() as session:
        start = time.time()

        # 同时发出 200 个请求,总耗时 ≈ 最慢的一个
        tasks = [fetch_api(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        elapsed = time.time() - start
        print(f"完成 {len(urls)} 个请求,耗时 {elapsed:.2f}s")

在同步版本中,200 个请求会顺序累加。而使用上面的代码,所有连接会一次性进入等待状态,事件循环只需要支付一次最坏情况的 I/O 时间。这是 asyncio 给我的工作流带来的最大转变——“排队等待”变成了“一次性全部到达”。

控制并发和超时,避免自找麻烦

起初我懒散地使用 gather 一口气发起所有请求,结果立刻触发上游网关的速率限制——到处都是 429 错误。随后我引入了 asyncio.Semaphore 将并发度限制在 20 个同时请求,并加入了超时和重试机制。此时管道才真正达到生产就绪的水平。

import asyncio
import aiohttp
from asyncio import Semaphore

CONCURRENCY = 20
MAX_RETRIES = 2

async def fetch_with_limit(sem, session, url):
    async with sem:   # 超过 20 个协程会在这一行排队
        for attempt in range(MAX_RETRIES + 1):
            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
                    resp.raise_for_status()
                    return await resp.json()
            except Exception as e:
                if attempt == MAX_RETRIES:
                    return {"error": str(e), "url": url}
                await asyncio.sleep(2 ** attempt)  # 指数退避

async def main_controlled():
    urls = [...]  # your list of URLs
    sem = Semaphore(CONCURRENCY)

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(sem, session, u) for u in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

Semaphore 的作用类似保镖——一次只能让 20 个协程进入,其余的在 await sem 处排队。加入超时、状态检查以及指数退避的重试后,你就不会把下游服务炸掉,同时还能容忍短暂的故障。

经验教训:三小时追踪一个无声的 bug

1. 忘记 await 会把协程变成幽灵代码

我一开始注意到网络请求根本没有真正发出,但日志却显示“执行成功”。半小时后我发现用了 fetch_data(url) 而不是 await fetch_data(url)。没有 await 的协程只是一个生成器对象——事件循环会完全忽略它。Python 3.8+ 会发出 RuntimeWarning,但在长代码块中很容易被忽视。 解决办法:使用 python -W error::RuntimeWarning 将警告转为异常,或显式创建 asyncio.Task

2. 在协程内部混用同步阻塞代码

我最初复用了旧代码,在 async def 中直接调用 requests.get。事件循环被阻塞,所有并发都退化为串行执行。铁律:在 async 函数内部,绝不能使用任何同步阻塞调用。要么切换到相应的 aio 库(aiohttpaiofiles 等),要么使用 loop.run_in_executor 将阻塞调用转移到线程池。

对该流水线进行重构让我明白,asyncio 的真正威力不仅在于速度——更在于能够在不引入线程复杂性的情况下处理 I/O 密集型工作负载。但这也要求我们保持纪律:尊重事件循环,控制并发,并且永远、永远要 await

0 浏览
Back to Blog

相关文章

阅读更多 »

asyncio 陷阱:3 小时 Bug

上周,我的老板让我加速一个旧的 web‑scraping 项目。我想,“没问题——我直接把 asyncio 抛进去,进行并发抓取,理论上……”

让我损失3小时的 asyncio 错误

让我损失了3小时的错误 这件事发生在去年,我在为我们的 internal operations platform 添加 batch domain liveness check 功能时。该 require…