asyncio 陷阱:3 小时 Bug
Source: Dev.to
请提供您希望翻译的具体文本内容,我将为您翻译成简体中文。
场景:200 次 API 数据获取,从 40 秒降至 2 秒
原始的同步代码如下——简单但极其缓慢:
import time
import requests
def fetch_all(urls):
results = []
for url in urls:
resp = requests.get(url, timeout=5)
results.append(resp.json())
return results
urls = [f"https://api.example.com/item/{i}" for i in range(200)]
start = time.time()
data = fetch_all(urls)
print(f"耗时: {time.time() - start:.2f}s")
# 输出: 耗时: 41.23s
200 个顺序请求耗时超过 40 秒——体验极差。满怀信心,我决定用 asyncio 重写它。
核心概念:事件循环 + 协程 = 非阻塞并发
asyncio 的工作方式与多线程完全不同。它在单线程中运行一个事件循环,所有协程都在同一个线程内调度。当协程遇到 I/O 等待(网络、磁盘)时,会通过 await 主动将控制权交还给事件循环。循环随后立即切换到其他已就绪的协程执行。这样,CPU 就不会因为等待 I/O 而空转。
最基本的模式:
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.json()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
urls = [f"https://api.example.com/item/{i}" for i in range(200)]
asyncio.run(main())
asyncio.gather 会并发调度所有协程,因此总耗时大致等于最慢请求的时长,而不是所有请求时间之和。理论上很美好——但当我真正运行它时,问题就开始出现了。
Source: …
陷阱 1:协程内部的隐蔽同步阻塞调用
起初,我走了捷径,在协程中继续使用 requests.get,以为只要把它包装在 async def 中就可以了。结果事件循环在 requests.get 上彻底卡死——并发性荡然无存。
# 错误示范
import requests
async def fetch_bad(url):
resp = requests.get(url) # 同步阻塞!事件循环被堵死
return resp.json()
requests 库执行的是同步 I/O。调用后,整个线程会阻塞等待网络响应,事件循环失去控制。对于 asyncio,必须使用支持 async/await 的库——例如用于 HTTP 的 aiohttp、用于数据库查询的 aiomysql。
解决方案
将所有 I/O 替换为 async/await 生态系统中的库。对于不可避免的同步代码,可使用 loop.run_in_executor 将其转移到线程池:
import concurrent.futures
import asyncio
def sync_heavy_work(data):
# 这是一个没法改写的同步 CPU 计算
return sum(i * i for i in range(data))
async def run_in_thread(data):
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, sync_heavy_work, data)
return result
这可以防止阻塞事件循环,但不要滥用——线程切换仍然会带来开销。
陷阱 2:gather 在首次异常时抛出,丢弃其他结果
在 200 个请求中,预计会出现一些超时或 500 错误。第一次使用 gather 时,只要有一个任务抛出异常,整个 gather 就会立即抛出,其他 190 多个成功的响应全部被丢弃。
results = await asyncio.gather(*tasks) # 一个挂了,全部白干
解决方案
gather 提供了 return_exceptions=True 参数。它不会抛出异常,而是把异常对象直接放入结果列表中,这样你就可以像处理普通返回值一样处理它们:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, res in enumerate(results):
if isinstance(res, Exception):
print(f"任务 {i} 失败: {res}")
else:
process(res)
这样,即使有 10 个任务超时,你仍然可以得到其余 190 个任务的数据。我还喜欢在 fetch 中加入超时和重试逻辑:
from aiohttp import ClientTimeout
async def fetch(session, url, retries=2):
for attempt in range(retries):
try:
async with session.get(url, timeout=ClientTimeout(total=10)) as resp:
return await resp.json()
except Exception as e:
if attempt == retries - 1:
raise
# optional: backoff before retrying