비동기 작업 통합: Celery와 Flask-SocketIO 연결
Source: Dev.to
(번역을 진행하려면 번역하고자 하는 전체 텍스트를 제공해 주세요.)
소개
현대 웹 애플리케이션 개발에서 동기식 요청‑응답 사이클은 종종 병목 현상이 됩니다. 복잡한 PDF 보고서 생성, 비디오 업로드 처리, 머신러닝 모델 학습과 같은 작업은 몇 분에서 몇 시간까지 걸릴 수 있습니다. 이 기간 동안 표준 HTTP 요청을 차단하는 것은 받아들일 수 없으며—사용자 경험을 저하시키고, 서버 자원을 잡아두며, Nginx와 같은 리버스 프록시에서 타임아웃을 유발하기도 합니다.
표준적인 해결책은 이러한 무거운 작업을 Celery와 같은 분산 작업 큐에 오프로드하는 것입니다. 이렇게 하면 실행이 웹 서버와 분리됩니다. 그러나 이 분리는 새로운 문제를 낳습니다: 관측성. 작업이 백그라운드로 전송되면 웹 서버는 그 진행 상황을 알 수 없게 됩니다. “보고서가 50 % 완료되었습니다”라고 사용자에게 어떻게 알려줄 수 있을까요?
전통적으로 개발자들은 짧은 간격으로 (/status를 매초 요청) short‑polling을 사용했는데, 이는 서버에 중복된 요청을 폭풍처럼 몰아넣습니다. 훨씬 더 우수한 아키텍처는 Flask‑SocketIO를 활용해 백그라운드 워커가 클라이언트에게 실시간 업데이트를 푸시하도록 하는 것입니다. 이 글에서는 Celery 워커 프로세스와 Flask‑SocketIO 웹 서버 사이의 연결을 구현하는 데 필요한 엔지니어링을 자세히 다룹니다.
예시: “데이터 내보내기” 기능
| 단계 | 설명 |
|---|---|
| 트리거 | 사용자가 Export Data 버튼을 클릭합니다. |
| 응답 | 서버는 즉시 HTTP 202 Accepted 응답을 반환하여 작업이 시작되었음을 알립니다. |
| 실행 | 백엔드가 데이터베이스를 조회하고 큰 CSV 파일을 포맷합니다. |
| 피드백 | 사용자는 진행률 바가 0 %에서 100 %까지 실시간으로 움직이는 것을 보고, 완료 시 다운로드 링크를 받습니다. |
WebSocket 없이 프론트엔드는 서버를 맹목적으로 폴링해야 합니다.
WebSocket을 사용하면 백그라운드 워커가 상태가 변할 때만 (progress: 10 %, progress: 20 %) 이벤트를 능동적으로 전송합니다. 이는 네트워크 오버헤드를 줄이고 애플리케이션에 “실시간” 느낌을 제공합니다.
Source: …
프로세스 분리
- Web Server (Flask + Gunicorn + Eventlet) – 브라우저와의 활성 WebSocket 연결을 관리하고 열려 있는 TCP 소켓의 파일 디스크립터를 보유합니다.
- Worker (Celery) – 완전히 별개의 OS 프로세스로, 경우에 따라 다른 서버에 있을 수도 있습니다. Web Server가 보유한 WebSocket 연결에 접근 권한이 없으며 따라서 사용자 브라우저와 직접 “통신”할 수 없습니다.
메인 Flask 앱에서 전역 socketio 인스턴스를 Celery 작업으로 가져와 .emit()을 호출하면, 해당 호출은 조용히 실패하거나 오류를 발생시킵니다. Celery 워커는 자체 메모리 공간에서 동작하며 Web Server에 연결된 클라이언트들을 알지 못합니다.
브리지: 메시지 브로커
우리는 중간 매개체인 메시지 브로커(보통 Redis 또는 RabbitMQ)를 공유 이벤트 버스로 사용합니다.
# Flask‑SocketIO external emitter
SocketIO(message_queue='redis://localhost:6379/0')
Flask‑SocketIO의 External Emitters는 격리 문제를 해결합니다. SocketIO 클래스를 message_queue 인자로 설정하면, 클라이언트 연결을 유지하는 대신 브로커에 연결하는 쓰기 전용 클라이언트를 생성합니다.
- Celery 워커가
emit()을 호출하면, 라이브러리는 이벤트를 직렬화하여 Redis Pub/Sub 채널(예:flask-socketio)에 게시합니다. - 이 채널을 구독하고 있는 Web Server 프로세스가 메시지를 받아 디코드하고, 대상 클라이언트를 식별한 뒤 실제 WebSocket 연결을 통해 페이로드를 전달합니다.
따라서 동일한 Redis 백엔드를 공유하는 어떤 프로세스든 웹 클라이언트에게 메시지를 보낼 수 있습니다.
Common Pitfall: “Working outside of application context”
RuntimeError: Working outside of application context.
이 오류는 Flask 확장 기능들이 활성화된 Flask 애플리케이션(또는 요청) 컨텍스트를 기대하기 때문에 발생합니다.
실수 – tasks.py에서 Flask 앱에 연결된 socketio 객체를 임포트하고 이를 Celery 워커 안에서 사용한 경우. 워커에는 Flask 앱이 실행되고 있지 않으므로 컨텍스트에 바인딩된 전역 변수가 작동하지 않습니다.
해결 방법 – 독립형 SocketIO 객체를 Celery 모듈 내부에서 인스턴스화합니다. 이 객체는 Flask 앱을 필요로 하지 않으며, 메시지 큐에 대한 연결 문자열만 있으면 됩니다. 독립형 emitter를 사용하면 워커 내에서 Flask 애플리케이션 컨텍스트가 전혀 필요하지 않게 되어 백그라운드 작업을 더 깔끔하고 견고하게 만들 수 있습니다.
Source:
프로덕션‑레디 패턴
아래는 Celery와 Flask‑SocketIO를 통합하는 방법을 보여주는 최소한의 프로덕션‑레디 예시입니다.
app.py – 웹 서버 (Flask + SocketIO)
from flask import Flask, request
from flask_socketio import SocketIO
from celery_worker import make_celery
app = Flask(__name__)
# Configuration
app.config['SECRET_KEY'] = 'secret!'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# Initialise SocketIO with a message queue (Redis)
socketio = SocketIO(app, message_queue=app.config['CELERY_BROKER_URL'])
# Initialise Celery
celery = make_celery(app)
@app.route('/start-task', methods=['POST'])
def start_task():
"""
Starts a long‑running background task.
The client’s SocketIO session ID (sid) is passed to the task
so we can target the correct browser later.
"""
data = request.json.get('data')
sid = request.sid # Flask‑SocketIO injects this attribute
task = celery.send_task('tasks.long_running_task', args=[data, sid])
return {'task_id': task.id}, 202
if __name__ == '__main__':
# `socketio.run` starts the Flask app with the appropriate async mode
socketio.run(app, debug=True)
celery_worker.py – Celery 팩토리
from celery import Celery
def make_celery(app):
"""
Creates a Celery instance that uses the Flask app’s configuration.
"""
celery = Celery(
app.import_name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)
# Optional: make Flask app context available inside tasks
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
tasks.py – Celery 워커 (독립형 SocketIO emitter)
import time
from celery import Celery
from flask_socketio import SocketIO
# ----------------------------------------------------------------------
# 1️⃣ Message‑queue URL (Redis)
# ----------------------------------------------------------------------
REDIS_URL = 'redis://localhost:6379/0'
# ----------------------------------------------------------------------
# 2️⃣ Celery instance (standard configuration)
# ----------------------------------------------------------------------
celery = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)
# ----------------------------------------------------------------------
# 3️⃣ Stand‑alone SocketIO emitter
# ----------------------------------------------------------------------
# No Flask app is passed – this is an *external* emitter that writes
# to the Redis message queue.
socketio_emitter = SocketIO(message_queue=REDIS_URL)
@celery.task(name='tasks.long_running_task')
def long_running_task(payload, sid):
"""
Simulates a long‑running job and pushes progress updates
to the client identified by `sid`.
"""
total_steps = 5
for step in range(1, total_steps + 1):
# Simulate work
time.sleep(2)
# Compute progress percentage
progress = int(step / total_steps * 100)
# Emit progress to the specific client
socketio_emitter.emit(
'task_progress',
{'progress': progress, 'task_id': long_running_task.request.id},
room=sid # Target the correct browser session
)
# Final notification – task completed
socketio_emitter.emit(
'task_complete',
{'message': 'Export ready', 'task_id': long_running_task.request.id},
room=sid
)
Recap
| 필요 항목 | 왜 중요한가 |
|---|---|
| 워커에서 별도의 SocketIO emitter 사용 | “application context 외부에서 작업” 오류를 방지합니다. |
| 메시지 큐로 Redis (또는 RabbitMQ) 사용 | 웹 서버와 워커 간에 공유 Pub/Sub 채널을 제공합니다. |
클라이언트의 sid를 작업에 전달 | 워커가 emit 할 때 특정 브라우저 세션을 대상으로 할 수 있게 합니다. |
socketio.emit(..., room=sid) 사용 | 업데이트를 모든 사람에게가 아니라 의도된 클라이언트에게만 보냅니다. |
위 패턴을 따르면 다음과 같은 이점을 얻을 수 있습니다:
- 실시간 진행 상황 피드백을 폴링 없이 제공합니다.
- 확장 가능한 아키텍처 – 워커를 별도 머신에서 실행할 수 있습니다.
- 관심사의 명확한 분리 – 웹 서버는 연결을 처리하고, 워커는 무거운 작업을 수행합니다.
코딩 즐겁게! 🚀
Source:
Celery 작업 예시
import time
from celery import Celery
from flask_socketio import SocketIO
REDIS_URL = 'redis://localhost:6379/0'
celery = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)
socketio_emitter = SocketIO(message_queue=REDIS_URL)
@celery.task(name='tasks.long_running_task')
def long_running_task(data, user_sid):
"""
Background task that updates the user on progress.
"""
total_steps = 5
for i in range(total_steps):
time.sleep(1) # Simulate work
progress = int((i + 1) / total_steps * 100)
# Emit to the specific user's room (their session ID)
socketio_emitter.emit(
'progress',
{'percent': progress, 'status': 'Processing...'},
room=user_sid,
)
# Final completion event
socketio_emitter.emit('completion', {'result': 'Done!'}, room=user_sid)
return "OK"
Source: …
아키텍처 고려 사항
이 아키텍처는 강력하지만 관리해야 할 복잡성을 도입합니다.
지연 시간
- Celery에서 발생한 모든 이벤트는 Redis로 네트워크 홉을 거쳐 전달되고, 처리된 뒤 웹 서버가 받아 클라이언트에 전송됩니다.
- 일반적인 지연 시간은 10 ms 이하이며, 이는 직접 메모리 내에서 emit 하는 경우보다 높습니다.
전달 보장
- Redis Pub/Sub은 fire‑and‑forget 방식입니다.
- 웹 서버가 재시작하거나 Celery 워커가 메시지를 emit 하는 순간 Redis와의 연결이 일시적으로 끊기면 해당 업데이트는 손실됩니다.
- 중요한 알림의 경우 영구 인박스 또는 수신 확인 시스템 구현을 고려하십시오.
의존성 관리
- 웹 서버와 Celery 워커 모두 Flask‑SocketIO와 Redis의 호환 가능한 버전에 의존해야 합니다.
- 직렬화 프로토콜이 일치하지 않으면 메시지가 조용히 무시될 수 있습니다.
워커 확장
- 이 아키텍처는 수평 확장이 용이합니다.
- 모든 워커가 동일한 메시지‑큐 Redis URL을 가리키는 한 100개의 Celery 워커를 추가하는 것은 간단하며, 이들은 클라이언트에 업데이트를 원활히 푸시할 수 있습니다.
이미지 갤러리
(여기에 이미지를 삽입하세요)
Celery와 Flask‑SocketIO 연결
Celery와 Flask‑SocketIO를 연결하면 사용자를 방치하지 않고도 무거운 작업을 처리하는 반응형이며 전문가 수준의 애플리케이션을 구축할 수 있습니다. 핵심은 worker와 web server가 별개의 엔티티이며, 중립적인 제3자—Redis 메시지 큐를 통해 통신해야 한다는 점을 이해하는 것입니다.
프로덕션‑준비 체크리스트
-
Redis 구성
- Flask 앱과 독립형 Celery SocketIO 인스턴스 모두에서 Redis가
message_queue로 구성되어 있는지 확인합니다.
- Flask 앱과 독립형 Celery SocketIO 인스턴스 모두에서 Redis가
-
사용자 타깃팅
- 특정 사용자를 대상으로 하려면
request.sid(또는 전용 방 이름)를 Celery 작업에 전달합니다.
- 특정 사용자를 대상으로 하려면
-
SocketIO 초기화
- Celery 워커에서 SocketIO 생성자에 Flask 앱 객체를 전달 하지 마세요.
-
몽키패치 (해당되는 경우)
- 웹 서버의 진입점에서 몽키패치(예: Gevent/Eventlet)를 적용합니다.