3 Asyncio Pitfalls That Took Me 3 Hours to Debug and Almost Crashed Production

Published: (May 1, 2026 at 04:25 PM EDT)
4 min read
Source: Dev.to

Source: Dev.to

Introduction

Last week my lead asked me to speed up a data‑aggregation service that calls 20 downstream APIs. The serial version took about 18 seconds, so I tried to refactor it with asyncio. What should have been a quick win turned into three hours of debugging and a near‑production outage. Below are the three biggest pitfalls I encountered and how to avoid them.

Asyncio Basics

  • Event loop – a single‑threaded scheduler that runs coroutines.
  • async def – defines a coroutine function.
  • await – yields control back to the loop while waiting for an awaitable.

A textbook example:

import asyncio

async def fetch(url):
    await asyncio.sleep(1)          # simulate network I/O
    return f"data from {url}"

async def main():
    tasks = [fetch(f"api/{i}") for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

Five requests complete in roughly one second—if everything is truly asynchronous.


Pitfall 1: Using await Inside a Synchronous Function

I dropped await fetch() straight into a Flask route:

@app.route("/data")
def get_data():
    result = await fetch(...)
    return result
  • Error: SyntaxError: 'await' outside async function.
  • Fix: Convert the view to an async function or run the coroutine from a background event loop.

Flask handles requests in a thread pool, and each worker thread has no event loop. Calling asyncio.run() inside such a thread raises “event loop already running”. Solutions:

  1. Switch to an async‑native framework (e.g., Quart, FastAPI).
  2. Create a global event loop at startup and schedule work with loop.run_until_complete().
  3. Run a dedicated asyncio thread and communicate via a queue.

Pitfall 2: Mixing Blocking I/O with Async Code

I tried to parallelise 20 calls with:

results = await asyncio.gather(*[call_api_blocking(i) for i in range(20)])

call_api_blocking used requests.get(), a synchronous, blocking call. The result was still ~18 seconds because the first requests.get blocked the entire thread.

Solution: Offload blocking calls to a thread pool:

import asyncio
import requests

async def call_api_async(url):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, requests.get, url)

Better yet, replace requests with an async HTTP client such as aiohttp. The rule of thumb: async code must use async I/O primitives exclusively.


Pitfall 3: Improper Task Management Leading to Memory Leaks

After fixing performance, the service started getting OOM‑killed. The culprit was a manual task list:

tasks = []
for url in urls:
    task = asyncio.create_task(process(url))
    tasks.append(task)

for t in tasks:
    await t

If process(url) returned early or raised an exception, some tasks remained PENDING or CANCELLED while still referenced in tasks. Those tasks held large response objects, preventing garbage collection.

Modern fix (Python 3.11+): Use asyncio.TaskGroup to manage lifetimes automatically:

async def main(urls):
    async with asyncio.TaskGroup() as tg:
        for url in urls:
            tg.create_task(process(url))

For older Python versions, explicitly cancel pending tasks in a finally block and clear the list.


A Working Example

Below is the final skeleton I settled on. It uses a semaphore to limit concurrency, a shared aiohttp session, proper exception handling, and timeouts.

import asyncio
import aiohttp
import time
from typing import List

class AsyncFetcher:
    def __init__(self, concurrency: int = 10, timeout: int = 10):
        self.sem = asyncio.Semaphore(concurrency)  # limit concurrent requests
        self.timeout = aiohttp.ClientTimeout(total=timeout)

    async def fetch_one(self, session: aiohttp.ClientSession, url: str) -> dict:
        async with self.sem:
            try:
                async with session.get(url, timeout=self.timeout) as resp:
                    data = await resp.json()
                    return data
            except Exception as exc:
                # Log or handle per‑request errors here
                return {"error": str(exc)}

    async def fetch_all(self, urls: List[str]) -> List[dict]:
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_one(session, url) for url in urls]
            return await asyncio.gather(*tasks, return_exceptions=False)

# Example usage
if __name__ == "__main__":
    urls = [f"https://api.example.com/item/{i}" for i in range(20)]
    fetcher = AsyncFetcher(concurrency=5, timeout=5)

    start = time.time()
    results = asyncio.run(fetcher.fetch_all(urls))
    elapsed = time.time() - start

    print(f"Fetched {len(results)} items in {elapsed:.2f}s")

Key takeaways

  • Use an async‑native web framework or run a dedicated event loop.
  • Never mix blocking I/O (e.g., requests) with async code; offload it or switch to an async library.
  • Manage tasks with asyncio.TaskGroup (or careful cancellation) to avoid memory leaks.
  • Limit concurrency with semaphores and reuse a single aiohttp session for efficiency.
0 views
Back to Blog

Related posts

Read more »