asyncio 함정: 3시간 버그

발행: (2026년 5월 1일 AM 10:21 GMT+9)
6 분 소요
원문: Dev.to

I’m happy to translate the article for you, but I need the actual text you’d like translated. Could you please paste the content (or the portion you want translated) here? I’ll keep the source link at the top and preserve all formatting, markdown, and code blocks as you requested.

시나리오: 200개의 API 데이터 가져오기, 40 초에서 2 초로

원래 동기식 코드는 다음과 같았습니다 — 간단하지만 고통스럽게 느렸습니다:

import time
import requests

def fetch_all(urls):
    results = []
    for url in urls:
        resp = requests.get(url, timeout=5)
        results.append(resp.json())
    return results

urls = [f"https://api.example.com/item/{i}" for i in range(200)]
start = time.time()
data = fetch_all(urls)
print(f"耗时: {time.time() - start:.2f}s")
# 输出: 耗时: 41.23s

200개의 순차 요청이 40 초가 넘게 걸렸습니다 — 끔찍한 경험이었습니다. 자신감에 차서 asyncio로 다시 작성하려고 했습니다.

핵심 개념: 이벤트 루프 + 코루틴 = 논블로킹 동시성

asyncio는 멀티스레딩과 전혀 다르게 동작합니다. 모든 코루틴이 동일한 스레드 내에서 스케줄되는 단일 스레드 이벤트 루프를 실행합니다. 코루틴이 I/O 대기(네트워크, 디스크 등)에 도달하면 await을 통해 자발적으로 제어권을 이벤트 루프에 반환합니다. 루프는 즉시 실행 준비가 된 다른 코루틴으로 전환합니다. 이렇게 하면 CPU가 I/O를 기다리며 빈둥거리지 않습니다.

가장 기본적인 패턴:

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as resp:
        return await resp.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

urls = [f"https://api.example.com/item/{i}" for i in range(200)]
asyncio.run(main())

asyncio.gather는 모든 코루틴을 동시에 스케줄하므로 전체 실행 시간은 가장 오래 걸리는 요청의 소요 시간 정도이며, 합산된 시간이 아닙니다. 이론적으로는 멋진데, 실제로 실행해 보니 함정이 나타나기 시작했습니다.

Source:

함정 1: 코루틴 내부에서 은밀하게 동기 차단 호출 사용하기

처음에는 async def 로 감싸기만 하면 requests.get 을 코루틴 안에서 계속 사용해도 된다고 생각하고 편법을 택했습니다. 그런데 이벤트 루프가 requests.get 에 완전히 걸려버려서 동시성이 사라졌습니다.

# 错误示范
import requests

async def fetch_bad(url):
    resp = requests.get(url)   # 同步阻塞!事件循环被堵死
    return resp.json()

requests 라이브러리는 동기 I/O를 수행합니다. 호출되면 전체 스레드가 네트워크 응답을 기다리며 차단되고, 이벤트 루프는 제어권을 잃게 됩니다. asyncio 를 사용할 때는 aiohttp(HTTP)나 aiomysql(데이터베이스)처럼 async/await 를 지원하는 라이브러리를 사용해야 합니다.

해결책

모든 I/O를 async/await 생태계의 라이브러리로 교체합니다. 불가피하게 동기 코드를 사용해야 할 경우에는 loop.run_in_executor 로 스레드 풀에 넘겨 비동기화합니다:

import concurrent.futures
import asyncio

def sync_heavy_work(data):
    # 这是一个没法改写的同步 CPU 计算
    return sum(i * i for i in range(data))

async def run_in_thread(data):
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, sync_heavy_work, data)
    return result

이렇게 하면 이벤트 루프가 차단되는 것을 방지할 수 있지만, 스레드 전환 오버헤드가 여전히 존재하므로 남용하지 않는 것이 좋습니다.

함정 2: gather가 첫 번째 예외에서 바로 오류를 발생시켜 다른 결과를 버림

200개의 요청을 보낼 때, 몇 개의 타임아웃이나 500 오류는 예상됩니다. 처음 gather를 사용했을 때, 하나의 작업이 예외를 발생시키면 전체 gather가 즉시 예외를 던지고, 나머지 190개 이상의 성공적인 응답은 모두 버려졌습니다.

results = await asyncio.gather(*tasks)  # 一个挂了,全部白干

해결책

gather에는 return_exceptions=True 매개변수가 있습니다. 예외를 발생시키는 대신, 예외 객체를 결과 리스트에 직접 넣어 일반 반환값처럼 처리할 수 있습니다:

results = await asyncio.gather(*tasks, return_exceptions=True)

for i, res in enumerate(results):
    if isinstance(res, Exception):
        print(f"任务 {i} 失败: {res}")
    else:
        process(res)

이렇게 하면 10개의 작업이 타임아웃되더라도 나머지 190개의 데이터는 여전히 얻을 수 있습니다. 또한 fetch 내부에 타임아웃 및 재시도 로직을 추가하는 것을 선호합니다:

from aiohttp import ClientTimeout

async def fetch(session, url, retries=2):
    for attempt in range(retries):
        try:
            async with session.get(url, timeout=ClientTimeout(total=10)) as resp:
                return await resp.json()
        except Exception as e:
            if attempt == retries - 1:
                raise
            # optional: backoff before retrying
0 조회
Back to Blog

관련 글

더 보기 »