让我损失3小时的 asyncio 错误

发布: (2026年5月1日 GMT+8 09:53)
6 分钟阅读
原文: Dev.to

Source: Dev.to

它发生在去年,当时我正在为我们的内部运营平台添加一个 批量域名存活检查 功能。
需求很简单:定期轮询 1 000 多个域名,检查 HTTP 状态码,如果在 5 秒内没有响应则将该域名标记为宕机。

我以为这是一个 I/O‑bound 任务,于是我使用了 asyncio。我写了一堆 async defawaitasyncio.gather 调用,自信满满地运行……结果呢?
1 000 个域名用了超过四分钟——几乎和同步、顺序实现没有区别。

接下来的三个小时里,我对 asyncio 的理解被彻底拆解并重新构建。如果你曾经在 async 函数内部不小心阻塞了事件循环,或者在 gather 中丢失了异常而毫不知情,这个战争故事应该能帮你省下超过三小时的时间。

初始(错误)实现

代码

import asyncio
import time
import requests

async def check_domain(url: str) -> dict:
    """检测单个域名的状态码和耗时"""
    start = time.monotonic()
    try:
        # 注意这里用的是 requests,同步库
        resp = requests.get(url, timeout=5, allow_redirects=True)
        status = resp.status_code
    except Exception as e:
        status = str(e)
    elapsed = time.monotonic() - start
    return {"url": url, "status": status, "elapsed": elapsed}

async def main():
    urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)]  # 模拟慢速接口
    t_start = time.monotonic()
    # 希望全部并发
    results = await asyncio.gather(*[check_domain(url) for url in urls])
    t_end = time.monotonic()
    print(f"总耗时 {t_end - t_start:.2f} 秒,完成 {len(results)} 个检测")
    # 打印前 3 个结果
    for r in results[:3]:
        print(r)

if __name__ == "__main__":
    asyncio.run(main())

你可能会立刻发现问题:在 async 协程内部调用同步阻塞的 requests.get
当时我执着于“我用了 async def 定义它,所以它是协程,gather 会让它并发”,完全忽视了事件循环的实际工作方式。

50 个 URL,每个都有 1 秒的延迟,导致总运行时间 超过 50 秒——这是一个典型的在 async 语法下隐藏的顺序请求案例。

为什么它不起作用

  • asyncio单线程事件循环 上运行。
  • 单独使用 async def不会 增加并发;它仅将函数标记为 可能 可等待的。
  • 真正的并发发生在协程 await 一个 异步 对象时(例如 aiohttp 请求)。
  • requests.get 执行 同步套接字 I/O;在阻塞期间,整个线程(以及事件循环)都会卡住。
  • asyncio.gather 仍会调度协程,但每个协程都会阻塞循环,直到其 requests.get 返回,因此它们是 一个接一个 运行的。

使用 aiohttp 的正确实现

代码

import aiohttp
import asyncio
import time

async def check_domain_async(session: aiohttp.ClientSession, url: str) -> dict:
    """真正的异步检测"""
    start = time.monotonic()
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
            status = resp.status
    except Exception as e:
        status = str(e)
    elapsed = time.monotonic() - start
    return {"url": url, "status": status, "elapsed": elapsed}

async def main_async():
    urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)]
    t_start = time.monotonic()
    async with aiohttp.ClientSession() as session:
        tasks = [check_domain_async(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    t_end = time.monotonic()
    print(f"总耗时 {t_end - t_start:.2f} 秒,完成 {len(results)} 个检测")

使用 aiohttp,所有 50 个请求大约在 1.5 秒 内完成(假设服务器能够跟上)。加速效果显著,且避免了之前的陷阱。

Source:

使用 asyncio.gather 处理异常

当需要进行健壮的错误处理时,会出现一个更微妙的陷阱。默认情况下,如果传递给 gather任意 协程抛出异常,gather 会在 所有任务完成后 传播该异常,这可能会掩盖其他协程的错误。

try:
    results = await asyncio.gather(*tasks)
except Exception:
    logger.error("批量检测出错")

在这种模式下:

  • 捕获了异常,但你失去了关于 哪个 任务失败以及 为什么 失败的信息。
  • 其他任务会继续运行,如果你希望提前中止,这可能并不理想。

更安全的模式

  • 使用 return_exceptions=True 将结果和异常一起收集:
results = await asyncio.gather(*tasks, return_exceptions=True)
for idx, result in enumerate(results):
    if isinstance(result, Exception):
        logger.error(f"Task {idx} failed: {result}")
    else:
        process(result)
  • 或者在第一次失败时手动取消剩余任务:
tasks = [asyncio.create_task(coro) for coro in coros]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

for task in pending:
    task.cancel()

理解 gather 的异常语义可以防止静默失败,使并发代码更加可靠。


要点: 仅使用 async def 声明函数只是故事的一半。你必须使用真正的异步 I/O(例如 aiohttp),并且要了解 asyncio.gather 如何传播异常。掌握这些细节,就能避免花费数小时追踪虚幻的性能 bug。

0 浏览
Back to Blog

相关文章

阅读更多 »