3시간을 낭비한 asyncio 함정, 거의 프로덕션을 무너뜨릴 뻔했다
Source: Dev.to
사고 개요
지난 금요일 오후 5시, 노트북을 닫고 몰래 나가려던 순간, 알림 채널이 폭발했습니다 — 온라인 데이터 수집 서비스의 타임아웃 비율이 40 %까지 급등했고, 모든 하위 보고서는 비어 있었습니다. 로그를 확인해 보니 수천 개의 URL을 처리하던 크롤러가 아직도 오래된 동기식 requests 라이브러리를 사용해 하나씩 가져오고 있었습니다. 각 요청은 평균 1.2 초가 걸렸고, 전체 라운드에는 거의 20 분이 소요되었지만, 비즈니스 요구사항은 5 분 이내 완료를 요구했습니다. 떠나기 전에 asyncio 로 동시성을 위해 재작성하고 배포하겠다는 생각만이 머릿속을 스쳤습니다.
그 결정은 세 가지 큰 함정으로 이어졌고, 서비스가 거의 망가질 뻔했습니다. 아래는 제가 얻은 교훈으로, 여러분이 그 세 시간을 절약하길 바랍니다.
asyncio 이해하기
asyncio의 핵심은 이벤트 루프와 코루틴입니다.
- 이벤트 루프는 지속적으로 폴링하는 스케줄러입니다.
- 각 코루틴은 자발적으로 일시 정지하고 제어권을 반환할 수 있는 작업입니다.
코루틴이 네트워크 응답(입출력)을 기다리고 있을 때, 이벤트 루프는 즉시 다른 준비된 코루틴으로 전환하여 CPU가 유휴 상태로 회전하는 것을 방지합니다.
전통적인 멀티스레딩과 가장 큰 차이점은 asyncio가 단일 스레드 내에서 협력적 스케줄링을 사용한다는 점으로, 스레드 전환 오버헤드와 GIL 경쟁을 피할 수 있습니다. 특히 네트워크 요청이 많은 상황에서 큰 장점을 발휘합니다.
일반적인 패턴:
async def coro():
await async_io_operation()
asyncio.gather()를 사용해 여러 코루틴을 모읍니다. 전체 소요 시간은 가장 느린 작업에 의해 결정되며, 모든 작업을 합한 시간에 의해 결정되지 않습니다.
순진한 구현 (하지 말아야 할 것)
import asyncio
import requests # 同步库,不能用!
async def fetch(url):
# 错误示范:直接把同步的 requests 放在协程里
resp = requests.get(url, timeout=5) # 这次调用会阻塞整个线程!
return resp.status_code
async def main():
urls = ["https://httpbin.org/delay/1"] * 10
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
이를 실행하면 모든 요청이 여전히 순차적으로 수행됩니다. requests.get()은 차단 호출이므로 네트워크를 기다리는 동안 이벤트 루프에 제어권을 반환하지 않아 한 번에 하나의 코루틴만 실행됩니다. 따라서 이벤트 루프는 사실상 쓸모가 없게 됩니다.
Correct async HTTP client
import asyncio
import aiohttp
async def fetch(session, url):
# 使用 aiohttp 的异步请求,await 时将控制权交还事件循环
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return await resp.text()
async def main():
urls = ["https://httpbin.org/delay/1"] * 10
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
asyncio.run(main())
이제 이벤트 루프는 실제로 1초짜리 요청 10개를 동시에 실행하여 10초가 아니라 약 1초 만에 완료됩니다. 실제 운영 환경에서는 크롤러가 20 분에서 2 분 이하로 단축되었습니다.
예외 처리 및 동시성 제한
URL 목록이 수백 개에 달했을 때, 가끔 발생하는 타임아웃이나 DNS 오류 때문에 asyncio.gather()가 즉시 예외를 발생시키고 남은 코루틴을 취소하여 전체 배치를 망가뜨렸습니다.
async def fetch_with_sem(sem, session, url):
async with sem: # 限制并发数,防止瞬间占满文件描述符
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return url, await resp.text()
except Exception as e:
return url, f"ERROR: {e}"
async def main():
urls = [...] # 几百个 URL
sem = asyncio.Semaphore(50) # 限制并发,避免触发系统或服务端限制
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_sem(sem, session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True) # 关键!
for url, content in results:
if isinstance(content, Exception):
print(f"{url} 失败: {content}")
else:
process(content)
return_exceptions=True옵션을 사용하면gather가 전체 배치를 중단하는 대신 예외 객체를 반환합니다.- 세마포어는 동시 연결 수(예: 50)를 제한하여 파일 디스크립터가 고갈되거나 원격 서비스의 속도 제한에 걸리는 것을 방지합니다.
- 예외를 잡아두고 필요에 따라 재시도하면 안정적인 서비스를 제공할 수 있습니다.
실용적인 팁
- 코루틴 안에서
time.sleep()을 절대 호출하지 마세요 – 전체 스레드를 차단합니다. 대신await asyncio.sleep()을 사용하세요. - 무제한 동시성을 조심하세요 – 한 번에 수천 개의 연결을 생성하면 파일 디스크립터 한도를 초과하거나 서버의 속도 제한에 걸릴 수 있습니다.
asyncio.Semaphore또는 연결 풀 제한을 사용하세요. - 백오프와 재시도를 구현하세요 – 일시적인 네트워크 문제는 정상입니다. 견고한 프로덕션 수준 코드를 위해
return_exceptions=True와 지수 백오프 재시도를 결합하세요.
Conclusion
동기 I/O‑바운드 서비스를 asyncio로 다시 작성하는 것은 가장 만족스러운 최적화 중 하나입니다. 하지만 이러한 함정에 주의하지 않으면 악몽으로 변할 수 있습니다. 저는 3시간을 잃었고 거의 안정적인 프로덕션을 금요일에 잃을 뻔했습니다. 이 글이 여러분을 같은 운명에서 구해주길 바랍니다.