让我损失3小时的 asyncio 错误
Source: Dev.to
它发生在去年,当时我正在为我们的内部运营平台添加一个 批量域名存活检查 功能。
需求很简单:定期轮询 1 000 多个域名,检查 HTTP 状态码,如果在 5 秒内没有响应则将该域名标记为宕机。
我以为这是一个 I/O‑bound 任务,于是我使用了 asyncio。我写了一堆 async def、await 和 asyncio.gather 调用,自信满满地运行……结果呢?
1 000 个域名用了超过四分钟——几乎和同步、顺序实现没有区别。
接下来的三个小时里,我对 asyncio 的理解被彻底拆解并重新构建。如果你曾经在 async 函数内部不小心阻塞了事件循环,或者在 gather 中丢失了异常而毫不知情,这个战争故事应该能帮你省下超过三小时的时间。
初始(错误)实现
代码
import asyncio
import time
import requests
async def check_domain(url: str) -> dict:
"""检测单个域名的状态码和耗时"""
start = time.monotonic()
try:
# 注意这里用的是 requests,同步库
resp = requests.get(url, timeout=5, allow_redirects=True)
status = resp.status_code
except Exception as e:
status = str(e)
elapsed = time.monotonic() - start
return {"url": url, "status": status, "elapsed": elapsed}
async def main():
urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)] # 模拟慢速接口
t_start = time.monotonic()
# 希望全部并发
results = await asyncio.gather(*[check_domain(url) for url in urls])
t_end = time.monotonic()
print(f"总耗时 {t_end - t_start:.2f} 秒,完成 {len(results)} 个检测")
# 打印前 3 个结果
for r in results[:3]:
print(r)
if __name__ == "__main__":
asyncio.run(main())
你可能会立刻发现问题:在 async 协程内部调用同步阻塞的 requests.get。
当时我执着于“我用了 async def 定义它,所以它是协程,gather 会让它并发”,完全忽视了事件循环的实际工作方式。
50 个 URL,每个都有 1 秒的延迟,导致总运行时间 超过 50 秒——这是一个典型的在 async 语法下隐藏的顺序请求案例。
为什么它不起作用
asyncio在 单线程事件循环 上运行。- 单独使用
async def并 不会 增加并发;它仅将函数标记为 可能 可等待的。 - 真正的并发发生在协程 await 一个 异步 对象时(例如
aiohttp请求)。 requests.get执行 同步套接字 I/O;在阻塞期间,整个线程(以及事件循环)都会卡住。asyncio.gather仍会调度协程,但每个协程都会阻塞循环,直到其requests.get返回,因此它们是 一个接一个 运行的。
使用 aiohttp 的正确实现
代码
import aiohttp
import asyncio
import time
async def check_domain_async(session: aiohttp.ClientSession, url: str) -> dict:
"""真正的异步检测"""
start = time.monotonic()
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
status = resp.status
except Exception as e:
status = str(e)
elapsed = time.monotonic() - start
return {"url": url, "status": status, "elapsed": elapsed}
async def main_async():
urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)]
t_start = time.monotonic()
async with aiohttp.ClientSession() as session:
tasks = [check_domain_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
t_end = time.monotonic()
print(f"总耗时 {t_end - t_start:.2f} 秒,完成 {len(results)} 个检测")
使用 aiohttp,所有 50 个请求大约在 1.5 秒 内完成(假设服务器能够跟上)。加速效果显著,且避免了之前的陷阱。
Source: …
使用 asyncio.gather 处理异常
当需要进行健壮的错误处理时,会出现一个更微妙的陷阱。默认情况下,如果传递给 gather 的 任意 协程抛出异常,gather 会在 所有任务完成后 传播该异常,这可能会掩盖其他协程的错误。
try:
results = await asyncio.gather(*tasks)
except Exception:
logger.error("批量检测出错")
在这种模式下:
- 捕获了异常,但你失去了关于 哪个 任务失败以及 为什么 失败的信息。
- 其他任务会继续运行,如果你希望提前中止,这可能并不理想。
更安全的模式
- 使用
return_exceptions=True将结果和异常一起收集:
results = await asyncio.gather(*tasks, return_exceptions=True)
for idx, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {idx} failed: {result}")
else:
process(result)
- 或者在第一次失败时手动取消剩余任务:
tasks = [asyncio.create_task(coro) for coro in coros]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
for task in pending:
task.cancel()
理解 gather 的异常语义可以防止静默失败,使并发代码更加可靠。
要点: 仅使用 async def 声明函数只是故事的一半。你必须使用真正的异步 I/O(例如 aiohttp),并且要了解 asyncio.gather 如何传播异常。掌握这些细节,就能避免花费数小时追踪虚幻的性能 bug。