The asyncio Mistake That Cost Me 3 Hours
Source: Dev.to
It happened last year when I was adding a batch domain liveness check feature to our internal operations platform.
The requirement was simple: periodically poll 1 000+ domains, check HTTP status codes, and flag any domain as down if it didn’t respond within 5 seconds.
I assumed this was an I/O‑bound task, so I reached for asyncio. I wrote a bunch of async def, await, and asyncio.gather calls, ran it with confidence… and the result?
1 000 domains took over four minutes – almost indistinguishable from a synchronous, sequential implementation.
Over the next three hours my understanding of asyncio was dismantled and rebuilt. If you’ve ever accidentally blocked the event loop inside an async function, or lost exceptions inside gather without even realizing it, this war story should save you more than three hours.
Initial (Incorrect) Implementation
Code
import asyncio
import time
import requests
async def check_domain(url: str) -> dict:
"""检测单个域名的状态码和耗时"""
start = time.monotonic()
try:
# 注意这里用的是 requests,同步库
resp = requests.get(url, timeout=5, allow_redirects=True)
status = resp.status_code
except Exception as e:
status = str(e)
elapsed = time.monotonic() - start
return {"url": url, "status": status, "elapsed": elapsed}
async def main():
urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)] # 模拟慢速接口
t_start = time.monotonic()
# 希望全部并发
results = await asyncio.gather(*[check_domain(url) for url in urls])
t_end = time.monotonic()
print(f"总耗时 {t_end - t_start:.2f} 秒,完成 {len(results)} 个检测")
# 打印前 3 个结果
for r in results[:3]:
print(r)
if __name__ == "__main__":
asyncio.run(main())
You might spot the issue immediately: calling the synchronous blocking requests.get inside an async coroutine.
Back then I was fixated on “I defined it with async def, so it’s a coroutine, and gather will make it concurrent,” completely ignoring how the event loop actually works.
Fifty URLs, each with a 1‑second delay, resulted in a total runtime of over 50 seconds – a textbook case of sequential requests hidden behind async syntax.
Why It Doesn’t Work
asyncioruns on a single‑threaded event loop.async defalone does not add concurrency; it only marks the function as potentially awaitable.- Real concurrency happens when the coroutine awaits an asynchronous object (e.g., an
aiohttprequest). requests.getperforms synchronous socket I/O; while it blocks, the whole thread (and thus the event loop) is stuck.asyncio.gatherwill still schedule the coroutines, but each one blocks the loop until itsrequests.getreturns, so they run one after another.
Correct Implementation with aiohttp
Code
import aiohttp
import asyncio
import time
async def check_domain_async(session: aiohttp.ClientSession, url: str) -> dict:
"""真正的异步检测"""
start = time.monotonic()
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
status = resp.status
except Exception as e:
status = str(e)
elapsed = time.monotonic() - start
return {"url": url, "status": status, "elapsed": elapsed}
async def main_async():
urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)]
t_start = time.monotonic()
async with aiohttp.ClientSession() as session:
tasks = [check_domain_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
t_end = time.monotonic()
print(f"总耗时 {t_end - t_start:.2f} 秒,完成 {len(results)} 个检测")
With aiohttp, all 50 requests finish in about 1.5 seconds (assuming the server can keep up). The speedup is dramatic, and the pitfall is eliminated.
Handling Exceptions with asyncio.gather
A more subtle trap appears when you need robust error handling. By default, if any coroutine passed to gather raises an exception, gather propagates that exception after all tasks have completed, potentially swallowing errors from other coroutines.
try:
results = await asyncio.gather(*tasks)
except Exception:
logger.error("批量检测出错")
In this pattern:
- The exception is caught, but you lose information about which task failed and why.
- The other tasks continue running, which may be undesirable if you want to abort early.
Safer Patterns
- Use
return_exceptions=Trueto collect results and exceptions together:
results = await asyncio.gather(*tasks, return_exceptions=True)
for idx, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {idx} failed: {result}")
else:
process(result)
- Or cancel remaining tasks manually when the first failure occurs:
tasks = [asyncio.create_task(coro) for coro in coros]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
for task in pending:
task.cancel()
Understanding gather’s exception semantics prevents silent failures and makes your concurrent code more reliable.
Takeaway: Declaring a function with async def is only half the story. You must use truly asynchronous I/O (e.g., aiohttp) and be aware of how asyncio.gather propagates exceptions. Master these details, and you’ll avoid spending hours chasing phantom performance bugs.