내게 3시간을 잡아먹인 asyncio 실수
Source: Dev.to
작년, 내부 운영 플랫폼에 batch domain liveness check 기능을 추가하고 있던 중에 이런 일이 발생했습니다.
요구사항은 간단했습니다: 1 000개 이상의 도메인을 주기적으로 폴링하고, HTTP 상태 코드를 확인한 뒤, 5 초 이내에 응답하지 않으면 해당 도메인을 다운으로 표시하는 것이었습니다.
이 작업이 I/O‑bound 작업이라고 가정하고 asyncio를 사용했습니다. async def, await, asyncio.gather 호출을 여러 개 작성하고 자신 있게 실행했는데… 결과는?
1 000개의 도메인이 4분 이상 걸렸습니다 – 동기식 순차 구현과 거의 구분이 되지 않을 정도였습니다.
그 후 3시간 동안 asyncio에 대한 나의 이해가 무너졌다가 다시 재구성되었습니다. 비동기 함수 안에서 이벤트 루프를 우연히 차단했거나, gather 안에서 예외가 사라진 것을 눈치채지 못한 적이 있다면, 이 전쟁 이야기가 여러분에게 3시간 이상의 시간을 절약해 줄 것입니다.
초기 (잘못된) 구현
코드
import asyncio
import time
import requests
async def check_domain(url: str) -> dict:
"""检测单个域名的状态码和耗时"""
start = time.monotonic()
try:
# 注意这里用的是 requests,同步库
resp = requests.get(url, timeout=5, allow_redirects=True)
status = resp.status_code
except Exception as e:
status = str(e)
elapsed = time.monotonic() - start
return {"url": url, "status": status, "elapsed": elapsed}
async def main():
urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)] # 模拟慢速接口
t_start = time.monotonic()
# 希望全部并发
results = await asyncio.gather(*[check_domain(url) for url in urls])
t_end = time.monotonic()
print(f"总耗时 {t_end - t_start:.2f} 秒,完成 {len(results)} 个检测")
# 打印前 3 个结果
for r in results[:3]:
print(r)
if __name__ == "__main__":
asyncio.run(main())
문제를 바로 발견할 수 있을 겁니다: 비동기 코루틴 안에서 동기 차단 requests.get을 호출하는 것.
그때 나는 “async def로 정의했으니 코루틴이고, gather가 병렬로 실행해 줄 거야” 라는 생각에 사로잡혀 있었고, 이벤트 루프가 실제로 어떻게 동작하는지 완전히 무시했습니다.
1초 지연이 있는 50개의 URL을 처리했을 때 전체 실행 시간이 50초 이상이 걸렸습니다 – 비동기 문법 뒤에 숨겨진 순차 요청의 전형적인 사례입니다.
Why It Doesn’t Work
asyncio는 단일 스레드 이벤트 루프에서 실행됩니다.async def만으로는 동시성이 추가되지 않으며; 함수가 잠재적으로 await 가능하다는 표시만 합니다.- 실제 동시성은 코루틴이 비동기 객체(예:
aiohttp요청)를 await 할 때 발생합니다. requests.get은 동기식 소켓 I/O를 수행합니다; 이 호출이 블록되는 동안 전체 스레드(따라서 이벤트 루프)도 멈춥니다.asyncio.gather는 여전히 코루틴들을 스케줄하지만, 각 코루틴이requests.get이 반환될 때까지 루프를 차단하므로 하나씩 순차적으로 실행됩니다.
aiohttp를 사용한 올바른 구현
코드
import aiohttp
import asyncio
import time
async def check_domain_async(session: aiohttp.ClientSession, url: str) -> dict:
"""真正的异步检测"""
start = time.monotonic()
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
status = resp.status
except Exception as e:
status = str(e)
elapsed = time.monotonic() - start
return {"url": url, "status": status, "elapsed": elapsed}
async def main_async():
urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)]
t_start = time.monotonic()
async with aiohttp.ClientSession() as session:
tasks = [check_domain_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
t_end = time.monotonic()
print(f"总耗时 {t_end - t_start:.2f} 秒,完成 {len(results)} 个检测")
aiohttp를 사용하면 50개의 요청이 약 1.5 seconds 안에 완료됩니다(서버가 따라줄 수 있다고 가정할 때). 속도 향상이 눈에 띄며, 문제점이 사라집니다.
asyncio.gather 로 예외 처리하기
보다 미묘한 함정은 견고한 오류 처리가 필요할 때 나타납니다. 기본적으로 gather에 전달된 어떤 코루틴이라도 예외를 발생시키면, gather는 모든 작업이 완료된 후 그 예외를 전파합니다. 이때 다른 코루틴에서 발생한 오류가 묻혀버릴 수 있습니다.
try:
results = await asyncio.gather(*tasks)
except Exception:
logger.error("批量检测出错")
이 패턴에서는:
- 예외는 잡히지만 어떤 작업이 실패했는지, 왜 실패했는지에 대한 정보를 잃게 됩니다.
- 다른 작업들은 계속 실행되며, 조기에 중단하고 싶을 경우 바람직하지 않을 수 있습니다.
더 안전한 패턴
return_exceptions=True를 사용해 결과와 예외를 함께 수집합니다:
results = await asyncio.gather(*tasks, return_exceptions=True)
for idx, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {idx} failed: {result}")
else:
process(result)
- 또는 첫 번째 실패가 발생했을 때 남은 작업을 수동으로 취소합니다:
tasks = [asyncio.create_task(coro) for coro in coros]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
for task in pending:
task.cancel()
gather 의 예외 전파 방식을 이해하면 조용히 사라지는 오류를 방지하고, 동시 실행 코드를 보다 신뢰성 있게 만들 수 있습니다.
핵심 요점: async def 로 함수를 선언하는 것만으로는 충분하지 않습니다. 진정한 비동기 I/O(예: aiohttp)를 사용하고, asyncio.gather 가 예외를 어떻게 전파하는지 숙지해야 합니다. 이 세부 사항을 마스터하면, 성능 버그를 잡기 위해 몇 시간을 허비하는 일을 피할 수 있습니다.