3 小时浪费在 asyncio 陷阱上,差点导致生产环境宕机
Source: Dev.to
请提供您希望翻译的正文内容,我将按照要求保留源链接并翻译成简体中文。
事件概述
上周五下午 5 点,我正准备合上笔记本电脑悄悄离开时,警报渠道突然炸开——在线数据收集服务的超时率飙升至 40 %,所有下游报告均为空。我检查日志发现,爬虫仍在使用旧的同步 requests 库一次处理一个 URL,处理成千上万的 URL。每个请求平均耗时 1.2 秒,整个轮次几乎需要 20 分钟,而业务要求必须在 5 分钟内完成。脑中唯一闪过的念头是用 asyncio 重写以实现并发,并在离开前部署。
这个决定导致了三个主要陷阱,我差点把服务弄垮。下面是我痛苦总结的教训,希望能为你节省那宝贵的三小时。
理解 asyncio
asyncio 的核心是 事件循环 加上 协程。
- 事件循环是一个不断轮询的调度器。
- 每个协程都是一个可以自愿暂停并交回控制权的任务。
当协程在等待网络响应(I/O)时,事件循环会立即切换到另一个已就绪的协程,从而防止 CPU 空转闲置。
与传统多线程最大的区别在于,asyncio 在单个线程内使用 协作式调度,避免了线程切换开销和 GIL 竞争。它在网络请求密集的场景中尤为出色。
典型模式:
async def coro():
await async_io_operation()
使用 asyncio.gather() 收集多个协程。总耗时取决于 最慢 的任务,而不是所有任务的累计时间。
Naïve implementation (what not to do)
import asyncio
import requests # 同步库,不能用!
async def fetch(url):
# 错误示范:直接把同步的 requests 放在协程里
resp = requests.get(url, timeout=5) # 这次调用会阻塞整个线程!
return resp.status_code
async def main():
urls = ["https://httpbin.org/delay/1"] * 10
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
运行后会发现所有请求仍然是 顺序 的。requests.get() 是一个阻塞调用;在等待网络响应时它不会把控制权交还给事件循环,因此一次只能运行一个协程。事件循环实际上变得毫无用处。
正确的异步 HTTP 客户端
import asyncio
import aiohttp
async def fetch(session, url):
# 使用 aiohttp 的异步请求,await 时将控制权交还事件循环
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return await resp.text()
async def main():
urls = ["https://httpbin.org/delay/1"] * 10
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
asyncio.run(main())
现在事件循环真正并发地运行这十个 1 秒的请求,耗时略超过 1 秒,而不是 10 秒。在生产环境中,爬虫的执行时间从 20 分钟 降至 不到 2 分钟。
Source: …
处理异常和并发限制
当 URL 列表增长到几百个时,偶尔的超时或 DNS 失败会导致 asyncio.gather() 立即抛出异常,取消剩余的协程并使整个批次失效。
async def fetch_with_sem(sem, session, url):
async with sem: # 限制并发数,防止瞬间占满文件描述符
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return url, await resp.text()
except Exception as e:
return url, f"ERROR: {e}"
async def main():
urls = [...] # 几百个 URL
sem = asyncio.Semaphore(50) # 限制并发,避免触发系统或服务端限制
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_sem(sem, session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True) # 关键!
for url, content in results:
if isinstance(content, Exception):
print(f"{url} 失败: {content}")
else:
process(content)
return_exceptions=True使gather返回异常对象,而不是中止整个批次。- 信号量(semaphore)限制并发连接数(例如 50),以防耗尽文件描述符或触发远程速率限制。
- 捕获异常并在需要时重试,可实现服务的稳定运行。
实用技巧
- 永远不要在协程中调用
time.sleep()– 它会阻塞整个线程。请改用await asyncio.sleep()。 - 当心无限并发 – 一次生成成千上万的连接可能会耗尽文件描述符限制或触发服务器速率限制。请使用
asyncio.Semaphore或连接池限制。 - 实现退避和重试 – 短暂的网络问题是正常的。将
return_exceptions=True与指数退避重试相结合,以实现稳健的生产级代码。
结论
使用 asyncio 重写同步 I/O‑bound 服务是你可以进行的最令人满意的优化之一。但如果不小心,这些陷阱很容易把它变成噩梦。我损失了三个小时,差点在一个稳定的生产环境的星期五出大问题。希望这篇文章能让你免于同样的命运。