3 Asyncio Pitfalls That Took Me 3 Hours to Debug and Almost Crashed Production
Source: Dev.to
Introduction
Last week my lead asked me to speed up a data‑aggregation service that calls 20 downstream APIs. The serial version took about 18 seconds, so I tried to refactor it with asyncio. What should have been a quick win turned into three hours of debugging and a near‑production outage. Below are the three biggest pitfalls I encountered and how to avoid them.
Asyncio Basics
- Event loop – a single‑threaded scheduler that runs coroutines.
async def– defines a coroutine function.await– yields control back to the loop while waiting for an awaitable.
A textbook example:
import asyncio
async def fetch(url):
await asyncio.sleep(1) # simulate network I/O
return f"data from {url}"
async def main():
tasks = [fetch(f"api/{i}") for i in range(5)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
Five requests complete in roughly one second—if everything is truly asynchronous.
Pitfall 1: Using await Inside a Synchronous Function
I dropped await fetch() straight into a Flask route:
@app.route("/data")
def get_data():
result = await fetch(...)
return result
- Error:
SyntaxError: 'await' outside async function. - Fix: Convert the view to an async function or run the coroutine from a background event loop.
Flask handles requests in a thread pool, and each worker thread has no event loop. Calling asyncio.run() inside such a thread raises “event loop already running”. Solutions:
- Switch to an async‑native framework (e.g., Quart, FastAPI).
- Create a global event loop at startup and schedule work with
loop.run_until_complete(). - Run a dedicated asyncio thread and communicate via a queue.
Pitfall 2: Mixing Blocking I/O with Async Code
I tried to parallelise 20 calls with:
results = await asyncio.gather(*[call_api_blocking(i) for i in range(20)])
call_api_blocking used requests.get(), a synchronous, blocking call. The result was still ~18 seconds because the first requests.get blocked the entire thread.
Solution: Offload blocking calls to a thread pool:
import asyncio
import requests
async def call_api_async(url):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, requests.get, url)
Better yet, replace requests with an async HTTP client such as aiohttp. The rule of thumb: async code must use async I/O primitives exclusively.
Pitfall 3: Improper Task Management Leading to Memory Leaks
After fixing performance, the service started getting OOM‑killed. The culprit was a manual task list:
tasks = []
for url in urls:
task = asyncio.create_task(process(url))
tasks.append(task)
for t in tasks:
await t
If process(url) returned early or raised an exception, some tasks remained PENDING or CANCELLED while still referenced in tasks. Those tasks held large response objects, preventing garbage collection.
Modern fix (Python 3.11+): Use asyncio.TaskGroup to manage lifetimes automatically:
async def main(urls):
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(process(url))
For older Python versions, explicitly cancel pending tasks in a finally block and clear the list.
A Working Example
Below is the final skeleton I settled on. It uses a semaphore to limit concurrency, a shared aiohttp session, proper exception handling, and timeouts.
import asyncio
import aiohttp
import time
from typing import List
class AsyncFetcher:
def __init__(self, concurrency: int = 10, timeout: int = 10):
self.sem = asyncio.Semaphore(concurrency) # limit concurrent requests
self.timeout = aiohttp.ClientTimeout(total=timeout)
async def fetch_one(self, session: aiohttp.ClientSession, url: str) -> dict:
async with self.sem:
try:
async with session.get(url, timeout=self.timeout) as resp:
data = await resp.json()
return data
except Exception as exc:
# Log or handle per‑request errors here
return {"error": str(exc)}
async def fetch_all(self, urls: List[str]) -> List[dict]:
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=False)
# Example usage
if __name__ == "__main__":
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
fetcher = AsyncFetcher(concurrency=5, timeout=5)
start = time.time()
results = asyncio.run(fetcher.fetch_all(urls))
elapsed = time.time() - start
print(f"Fetched {len(results)} items in {elapsed:.2f}s")
Key takeaways
- Use an async‑native web framework or run a dedicated event loop.
- Never mix blocking I/O (e.g.,
requests) with async code; offload it or switch to an async library. - Manage tasks with
asyncio.TaskGroup(or careful cancellation) to avoid memory leaks. - Limit concurrency with semaphores and reuse a single
aiohttpsession for efficiency.