비동기 작업 통합: Celery와 Flask-SocketIO 연결

발행: (2025년 12월 19일 오전 07:30 GMT+9)
14 min read
원문: Dev.to

Source: Dev.to

(번역을 진행하려면 번역하고자 하는 전체 텍스트를 제공해 주세요.)

소개

현대 웹 애플리케이션 개발에서 동기식 요청‑응답 사이클은 종종 병목 현상이 됩니다. 복잡한 PDF 보고서 생성, 비디오 업로드 처리, 머신러닝 모델 학습과 같은 작업은 몇 분에서 몇 시간까지 걸릴 수 있습니다. 이 기간 동안 표준 HTTP 요청을 차단하는 것은 받아들일 수 없으며—사용자 경험을 저하시키고, 서버 자원을 잡아두며, Nginx와 같은 리버스 프록시에서 타임아웃을 유발하기도 합니다.

표준적인 해결책은 이러한 무거운 작업을 Celery와 같은 분산 작업 큐에 오프로드하는 것입니다. 이렇게 하면 실행이 웹 서버와 분리됩니다. 그러나 이 분리는 새로운 문제를 낳습니다: 관측성. 작업이 백그라운드로 전송되면 웹 서버는 그 진행 상황을 알 수 없게 됩니다. “보고서가 50 % 완료되었습니다”라고 사용자에게 어떻게 알려줄 수 있을까요?

전통적으로 개발자들은 짧은 간격으로 (/status를 매초 요청) short‑polling을 사용했는데, 이는 서버에 중복된 요청을 폭풍처럼 몰아넣습니다. 훨씬 더 우수한 아키텍처는 Flask‑SocketIO를 활용해 백그라운드 워커가 클라이언트에게 실시간 업데이트를 푸시하도록 하는 것입니다. 이 글에서는 Celery 워커 프로세스와 Flask‑SocketIO 웹 서버 사이의 연결을 구현하는 데 필요한 엔지니어링을 자세히 다룹니다.

예시: “데이터 내보내기” 기능

단계설명
트리거사용자가 Export Data 버튼을 클릭합니다.
응답서버는 즉시 HTTP 202 Accepted 응답을 반환하여 작업이 시작되었음을 알립니다.
실행백엔드가 데이터베이스를 조회하고 큰 CSV 파일을 포맷합니다.
피드백사용자는 진행률 바가 0 %에서 100 %까지 실시간으로 움직이는 것을 보고, 완료 시 다운로드 링크를 받습니다.

WebSocket 없이 프론트엔드는 서버를 맹목적으로 폴링해야 합니다.
WebSocket을 사용하면 백그라운드 워커가 상태가 변할 때만 (progress: 10 %, progress: 20 %) 이벤트를 능동적으로 전송합니다. 이는 네트워크 오버헤드를 줄이고 애플리케이션에 “실시간” 느낌을 제공합니다.

Source:

프로세스 분리

  1. Web Server (Flask + Gunicorn + Eventlet) – 브라우저와의 활성 WebSocket 연결을 관리하고 열려 있는 TCP 소켓의 파일 디스크립터를 보유합니다.
  2. Worker (Celery) – 완전히 별개의 OS 프로세스로, 경우에 따라 다른 서버에 있을 수도 있습니다. Web Server가 보유한 WebSocket 연결에 접근 권한이 없으며 따라서 사용자 브라우저와 직접 “통신”할 수 없습니다.

메인 Flask 앱에서 전역 socketio 인스턴스를 Celery 작업으로 가져와 .emit()을 호출하면, 해당 호출은 조용히 실패하거나 오류를 발생시킵니다. Celery 워커는 자체 메모리 공간에서 동작하며 Web Server에 연결된 클라이언트들을 알지 못합니다.

브리지: 메시지 브로커

우리는 중간 매개체인 메시지 브로커(보통 Redis 또는 RabbitMQ)를 공유 이벤트 버스로 사용합니다.

# Flask‑SocketIO external emitter
SocketIO(message_queue='redis://localhost:6379/0')

Flask‑SocketIO의 External Emitters는 격리 문제를 해결합니다. SocketIO 클래스를 message_queue 인자로 설정하면, 클라이언트 연결을 유지하는 대신 브로커에 연결하는 쓰기 전용 클라이언트를 생성합니다.

  • Celery 워커가 emit()을 호출하면, 라이브러리는 이벤트를 직렬화하여 Redis Pub/Sub 채널(예: flask-socketio)에 게시합니다.
  • 이 채널을 구독하고 있는 Web Server 프로세스가 메시지를 받아 디코드하고, 대상 클라이언트를 식별한 뒤 실제 WebSocket 연결을 통해 페이로드를 전달합니다.

따라서 동일한 Redis 백엔드를 공유하는 어떤 프로세스든 웹 클라이언트에게 메시지를 보낼 수 있습니다.

Common Pitfall: “Working outside of application context”

RuntimeError: Working outside of application context.

이 오류는 Flask 확장 기능들이 활성화된 Flask 애플리케이션(또는 요청) 컨텍스트를 기대하기 때문에 발생합니다.

실수tasks.py에서 Flask 앱에 연결된 socketio 객체를 임포트하고 이를 Celery 워커 안에서 사용한 경우. 워커에는 Flask 앱이 실행되고 있지 않으므로 컨텍스트에 바인딩된 전역 변수가 작동하지 않습니다.

해결 방법독립형 SocketIO 객체를 Celery 모듈 내부에서 인스턴스화합니다. 이 객체는 Flask 앱을 필요로 하지 않으며, 메시지 큐에 대한 연결 문자열만 있으면 됩니다. 독립형 emitter를 사용하면 워커 내에서 Flask 애플리케이션 컨텍스트가 전혀 필요하지 않게 되어 백그라운드 작업을 더 깔끔하고 견고하게 만들 수 있습니다.

Source:

프로덕션‑레디 패턴

아래는 Celery와 Flask‑SocketIO를 통합하는 방법을 보여주는 최소한의 프로덕션‑레디 예시입니다.

app.py – 웹 서버 (Flask + SocketIO)

from flask import Flask, request
from flask_socketio import SocketIO
from celery_worker import make_celery

app = Flask(__name__)

# Configuration
app.config['SECRET_KEY'] = 'secret!'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

# Initialise SocketIO with a message queue (Redis)
socketio = SocketIO(app, message_queue=app.config['CELERY_BROKER_URL'])

# Initialise Celery
celery = make_celery(app)


@app.route('/start-task', methods=['POST'])
def start_task():
    """
    Starts a long‑running background task.
    The client’s SocketIO session ID (sid) is passed to the task
    so we can target the correct browser later.
    """
    data = request.json.get('data')
    sid = request.sid  # Flask‑SocketIO injects this attribute
    task = celery.send_task('tasks.long_running_task', args=[data, sid])
    return {'task_id': task.id}, 202


if __name__ == '__main__':
    # `socketio.run` starts the Flask app with the appropriate async mode
    socketio.run(app, debug=True)

celery_worker.py – Celery 팩토리

from celery import Celery

def make_celery(app):
    """
    Creates a Celery instance that uses the Flask app’s configuration.
    """
    celery = Celery(
        app.import_name,
        broker=app.config['CELERY_BROKER_URL'],
        backend=app.config['CELERY_RESULT_BACKEND']
    )
    celery.conf.update(app.config)

    # Optional: make Flask app context available inside tasks
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

tasks.py – Celery 워커 (독립형 SocketIO emitter)

import time
from celery import Celery
from flask_socketio import SocketIO

# ----------------------------------------------------------------------
# 1️⃣  Message‑queue URL (Redis)
# ----------------------------------------------------------------------
REDIS_URL = 'redis://localhost:6379/0'

# ----------------------------------------------------------------------
# 2️⃣  Celery instance (standard configuration)
# ----------------------------------------------------------------------
celery = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)

# ----------------------------------------------------------------------
# 3️⃣  Stand‑alone SocketIO emitter
# ----------------------------------------------------------------------
# No Flask app is passed – this is an *external* emitter that writes
# to the Redis message queue.
socketio_emitter = SocketIO(message_queue=REDIS_URL)


@celery.task(name='tasks.long_running_task')
def long_running_task(payload, sid):
    """
    Simulates a long‑running job and pushes progress updates
    to the client identified by `sid`.
    """
    total_steps = 5
    for step in range(1, total_steps + 1):
        # Simulate work
        time.sleep(2)

        # Compute progress percentage
        progress = int(step / total_steps * 100)

        # Emit progress to the specific client
        socketio_emitter.emit(
            'task_progress',
            {'progress': progress, 'task_id': long_running_task.request.id},
            room=sid  # Target the correct browser session
        )

    # Final notification – task completed
    socketio_emitter.emit(
        'task_complete',
        {'message': 'Export ready', 'task_id': long_running_task.request.id},
        room=sid
    )

Recap

필요 항목왜 중요한가
워커에서 별도의 SocketIO emitter 사용“application context 외부에서 작업” 오류를 방지합니다.
메시지 큐로 Redis (또는 RabbitMQ) 사용웹 서버와 워커 간에 공유 Pub/Sub 채널을 제공합니다.
클라이언트의 sid를 작업에 전달워커가 emit 할 때 특정 브라우저 세션을 대상으로 할 수 있게 합니다.
socketio.emit(..., room=sid) 사용업데이트를 모든 사람에게가 아니라 의도된 클라이언트에게만 보냅니다.

위 패턴을 따르면 다음과 같은 이점을 얻을 수 있습니다:

  • 실시간 진행 상황 피드백을 폴링 없이 제공합니다.
  • 확장 가능한 아키텍처 – 워커를 별도 머신에서 실행할 수 있습니다.
  • 관심사의 명확한 분리 – 웹 서버는 연결을 처리하고, 워커는 무거운 작업을 수행합니다.

코딩 즐겁게! 🚀

Source:

Celery 작업 예시

import time
from celery import Celery
from flask_socketio import SocketIO

REDIS_URL = 'redis://localhost:6379/0'

celery = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)
socketio_emitter = SocketIO(message_queue=REDIS_URL)


@celery.task(name='tasks.long_running_task')
def long_running_task(data, user_sid):
    """
    Background task that updates the user on progress.
    """
    total_steps = 5
    for i in range(total_steps):
        time.sleep(1)  # Simulate work
        progress = int((i + 1) / total_steps * 100)

        # Emit to the specific user's room (their session ID)
        socketio_emitter.emit(
            'progress',
            {'percent': progress, 'status': 'Processing...'},
            room=user_sid,
        )

    # Final completion event
    socketio_emitter.emit('completion', {'result': 'Done!'}, room=user_sid)
    return "OK"

Source:

아키텍처 고려 사항

이 아키텍처는 강력하지만 관리해야 할 복잡성을 도입합니다.

지연 시간

  • Celery에서 발생한 모든 이벤트는 Redis로 네트워크 홉을 거쳐 전달되고, 처리된 뒤 웹 서버가 받아 클라이언트에 전송됩니다.
  • 일반적인 지연 시간은 10 ms 이하이며, 이는 직접 메모리 내에서 emit 하는 경우보다 높습니다.

전달 보장

  • Redis Pub/Sub은 fire‑and‑forget 방식입니다.
  • 웹 서버가 재시작하거나 Celery 워커가 메시지를 emit 하는 순간 Redis와의 연결이 일시적으로 끊기면 해당 업데이트는 손실됩니다.
  • 중요한 알림의 경우 영구 인박스 또는 수신 확인 시스템 구현을 고려하십시오.

의존성 관리

  • 웹 서버와 Celery 워커 모두 Flask‑SocketIORedis의 호환 가능한 버전에 의존해야 합니다.
  • 직렬화 프로토콜이 일치하지 않으면 메시지가 조용히 무시될 수 있습니다.

워커 확장

  • 이 아키텍처는 수평 확장이 용이합니다.
  • 모든 워커가 동일한 메시지‑큐 Redis URL을 가리키는 한 100개의 Celery 워커를 추가하는 것은 간단하며, 이들은 클라이언트에 업데이트를 원활히 푸시할 수 있습니다.

이미지 갤러리

(여기에 이미지를 삽입하세요)


Celery와 Flask‑SocketIO 연결

Celery와 Flask‑SocketIO를 연결하면 사용자를 방치하지 않고도 무거운 작업을 처리하는 반응형이며 전문가 수준의 애플리케이션을 구축할 수 있습니다. 핵심은 workerweb server가 별개의 엔티티이며, 중립적인 제3자—Redis 메시지 큐를 통해 통신해야 한다는 점을 이해하는 것입니다.

프로덕션‑준비 체크리스트

  • Redis 구성

    • Flask 앱과 독립형 Celery SocketIO 인스턴스 모두에서 Redis가 message_queue 로 구성되어 있는지 확인합니다.
  • 사용자 타깃팅

    • 특정 사용자를 대상으로 하려면 request.sid(또는 전용 방 이름)를 Celery 작업에 전달합니다.
  • SocketIO 초기화

    • Celery 워커에서 SocketIO 생성자에 Flask 앱 객체를 전달 하지 마세요.
  • 몽키패치 (해당되는 경우)

    • 웹 서버의 진입점에서 몽키패치(예: Gevent/Eventlet)를 적용합니다.
Back to Blog

관련 글

더 보기 »

Celery와 Redis를 이용한 스케일링

동기식 인덱싱의 문제점 사용자가 YouTube 재생목록의 인덱싱을 시작하면, 백엔드는 각 비디오를 하나씩 순차적으로 처리하는 동기식 루프를 실행했습니다.

celery-plus 🥬 — Node.js용 현대적인 Celery

왜 확인해 보세요? - 🚀 기존 Python Celery 워커와 함께 작동합니다 - 📘 TypeScript로 작성되었으며 전체 타입을 제공합니다 - 🔄 RabbitMQ AMQP와 Redis를 지원합니다 - ⚡ Async/a...