我如何使用 asyncio 将并发提升 40 倍 —— 运维误以为我们被 DDoS 攻击
Source: Dev.to
上个月,产品经理在休息室拦住我,直接说:“仪表盘又超时了。我们能不能别让老板一直刷新页面?” 当时,我们的指标同步脚本需要 11 分钟 才能跑完——200 个第三方 API 串行调用,日志里全是“一直在等待响应”的信息。我懒得解释,只是回到工位,打开编辑器,心想:这东西必须改成异步。
一周后,重写的代码上线。相同的 200 个接口,稳定地在 14 秒 内完成。监控告警立刻触发——运维以为我们遭到了 DDoS 攻击。下面讲讲这次重构是怎么实现的,asyncio 在哪里大放异彩,哪里又给我带来了坑,以及如何优雅地自救。
事件循环如何“偷取”时间
asyncio 并没有什么深奥的魔法——它只是一个单线程的 事件循环。可以把它想象成一个高度专注的调度器,它只做一件事:当任务 A 发起 HTTP 请求并在网络上等待时,调度器会挂起它,立刻转向任务 B,只有在 A 的字节到达时才会切换回来。没有线程切换的开销,没有回调地狱——所有逻辑都在 async/await 中实现。
import asyncio
import aiohttp
import time
# 模拟一次 API 调用
async def fetch_api(session, url: str) -> dict:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return await resp.json()
协程函数本质上只是一个蓝图,直到你创建任务并交给事件循环。一次性触发大量协程的最常用方式是 asyncio.gather —— 只需一行代码就能并发地点燃它们。总耗时不再是所有请求时间的累加,而是最慢那个请求的时长。
async def main():
urls = [f"https://api.example.com/data/{i}" for i in range(200)]
async with aiohttp.ClientSession() as session:
start = time.time()
# 同时发出 200 个请求,总耗时 ≈ 最慢的一个
tasks = [fetch_api(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.time() - start
print(f"完成 {len(urls)} 个请求,耗时 {elapsed:.2f}s")
在同步版本中,200 个请求会顺序累加。而使用上面的代码,所有连接会一次性进入等待状态,事件循环只需要支付一次最坏情况的 I/O 时间。这是 asyncio 给我的工作流带来的最大转变——“排队等待”变成了“一次性全部到达”。
控制并发和超时,避免自找麻烦
起初我懒散地使用 gather 一口气发起所有请求,结果立刻触发上游网关的速率限制——到处都是 429 错误。随后我引入了 asyncio.Semaphore 将并发度限制在 20 个同时请求,并加入了超时和重试机制。此时管道才真正达到生产就绪的水平。
import asyncio
import aiohttp
from asyncio import Semaphore
CONCURRENCY = 20
MAX_RETRIES = 2
async def fetch_with_limit(sem, session, url):
async with sem: # 超过 20 个协程会在这一行排队
for attempt in range(MAX_RETRIES + 1):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
resp.raise_for_status()
return await resp.json()
except Exception as e:
if attempt == MAX_RETRIES:
return {"error": str(e), "url": url}
await asyncio.sleep(2 ** attempt) # 指数退避
async def main_controlled():
urls = [...] # your list of URLs
sem = Semaphore(CONCURRENCY)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(sem, session, u) for u in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Semaphore 的作用类似保镖——一次只能让 20 个协程进入,其余的在 await sem 处排队。加入超时、状态检查以及指数退避的重试后,你就不会把下游服务炸掉,同时还能容忍短暂的故障。
经验教训:三小时追踪一个无声的 bug
1. 忘记 await 会把协程变成幽灵代码
我一开始注意到网络请求根本没有真正发出,但日志却显示“执行成功”。半小时后我发现用了 fetch_data(url) 而不是 await fetch_data(url)。没有 await 的协程只是一个生成器对象——事件循环会完全忽略它。Python 3.8+ 会发出 RuntimeWarning,但在长代码块中很容易被忽视。 解决办法:使用 python -W error::RuntimeWarning 将警告转为异常,或显式创建 asyncio.Task。
2. 在协程内部混用同步阻塞代码
我最初复用了旧代码,在 async def 中直接调用 requests.get。事件循环被阻塞,所有并发都退化为串行执行。铁律:在 async 函数内部,绝不能使用任何同步阻塞调用。要么切换到相应的 aio 库(aiohttp、aiofiles 等),要么使用 loop.run_in_executor 将阻塞调用转移到线程池。
对该流水线进行重构让我明白,asyncio 的真正威力不仅在于速度——更在于能够在不引入线程复杂性的情况下处理 I/O 密集型工作负载。但这也要求我们保持纪律:尊重事件循环,控制并发,并且永远、永远要 await。