asyncio 陷阱:3 小时 Bug

发布: (2026年5月1日 GMT+8 09:21)
5 分钟阅读
原文: Dev.to

Source: Dev.to

请提供您希望翻译的具体文本内容,我将为您翻译成简体中文。

场景:200 次 API 数据获取,从 40 秒降至 2 秒

原始的同步代码如下——简单但极其缓慢:

import time
import requests

def fetch_all(urls):
    results = []
    for url in urls:
        resp = requests.get(url, timeout=5)
        results.append(resp.json())
    return results

urls = [f"https://api.example.com/item/{i}" for i in range(200)]
start = time.time()
data = fetch_all(urls)
print(f"耗时: {time.time() - start:.2f}s")
# 输出: 耗时: 41.23s

200 个顺序请求耗时超过 40 秒——体验极差。满怀信心,我决定用 asyncio 重写它。

核心概念:事件循环 + 协程 = 非阻塞并发

asyncio 的工作方式与多线程完全不同。它在单线程中运行一个事件循环,所有协程都在同一个线程内调度。当协程遇到 I/O 等待(网络、磁盘)时,会通过 await 主动将控制权交还给事件循环。循环随后立即切换到其他已就绪的协程执行。这样,CPU 就不会因为等待 I/O 而空转。

最基本的模式:

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as resp:
        return await resp.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

urls = [f"https://api.example.com/item/{i}" for i in range(200)]
asyncio.run(main())

asyncio.gather 会并发调度所有协程,因此总耗时大致等于最慢请求的时长,而不是所有请求时间之和。理论上很美好——但当我真正运行它时,问题就开始出现了。

Source:

陷阱 1:协程内部的隐蔽同步阻塞调用

起初,我走了捷径,在协程中继续使用 requests.get,以为只要把它包装在 async def 中就可以了。结果事件循环在 requests.get 上彻底卡死——并发性荡然无存。

# 错误示范
import requests

async def fetch_bad(url):
    resp = requests.get(url)   # 同步阻塞!事件循环被堵死
    return resp.json()

requests 库执行的是同步 I/O。调用后,整个线程会阻塞等待网络响应,事件循环失去控制。对于 asyncio,必须使用支持 async/await 的库——例如用于 HTTP 的 aiohttp、用于数据库查询的 aiomysql

解决方案

将所有 I/O 替换为 async/await 生态系统中的库。对于不可避免的同步代码,可使用 loop.run_in_executor 将其转移到线程池:

import concurrent.futures
import asyncio

def sync_heavy_work(data):
    # 这是一个没法改写的同步 CPU 计算
    return sum(i * i for i in range(data))

async def run_in_thread(data):
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, sync_heavy_work, data)
    return result

这可以防止阻塞事件循环,但不要滥用——线程切换仍然会带来开销。

陷阱 2:gather 在首次异常时抛出,丢弃其他结果

在 200 个请求中,预计会出现一些超时或 500 错误。第一次使用 gather 时,只要有一个任务抛出异常,整个 gather 就会立即抛出,其他 190 多个成功的响应全部被丢弃。

results = await asyncio.gather(*tasks)  # 一个挂了,全部白干

解决方案

gather 提供了 return_exceptions=True 参数。它不会抛出异常,而是把异常对象直接放入结果列表中,这样你就可以像处理普通返回值一样处理它们:

results = await asyncio.gather(*tasks, return_exceptions=True)

for i, res in enumerate(results):
    if isinstance(res, Exception):
        print(f"任务 {i} 失败: {res}")
    else:
        process(res)

这样,即使有 10 个任务超时,你仍然可以得到其余 190 个任务的数据。我还喜欢在 fetch 中加入超时和重试逻辑:

from aiohttp import ClientTimeout

async def fetch(session, url, retries=2):
    for attempt in range(retries):
        try:
            async with session.get(url, timeout=ClientTimeout(total=10)) as resp:
                return await resp.json()
        except Exception as e:
            if attempt == retries - 1:
                raise
            # optional: backoff before retrying
0 浏览
Back to Blog

相关文章

阅读更多 »

让我损失3小时的 asyncio 错误

让我损失了3小时的错误 这件事发生在去年,我在为我们的 internal operations platform 添加 batch domain liveness check 功能时。该 require…