异步任务集成:连接 Celery 与 Flask-SocketIO

发布: (2025年12月19日 GMT+8 06:30)
11 min read
原文: Dev.to

Source: Dev.to

Introduction

在现代 Web 应用开发中,同步的请求‑响应循环往往成为瓶颈。生成复杂的 PDF 报告、处理视频上传或训练机器学习模型等操作可能需要数分钟甚至数小时。让标准的 HTTP 请求在此期间阻塞是不可接受的——这会降低用户体验,占用服务器资源,并且常常导致 Nginx 等反向代理超时。

标准的解决方案是将这些重负载操作交给分布式任务队列 Celery 来处理。这将执行过程与 Web 服务器解耦。然而,这种解耦引入了一个新挑战:可观测性。任务一旦被发送到后台,Web 服务器就失去了对它的跟踪。我们如何告诉用户他们的报告已经完成了 50 %?

传统做法是使用短轮询(JavaScript 每秒请求一次 /status),这会向服务器发送大量冗余请求。更优的架构是利用 Flask‑SocketIO 将后台工作进程的实时更新直接推送给客户端。本文详细阐述了在 Celery 工作进程与 Flask‑SocketIO Web 服务器之间搭建桥梁所需的工程实现。

Example: “Data Export” feature

步骤描述
触发用户点击 Export Data(导出数据)。
确认服务器立即返回 HTTP 202 Accepted 响应,表示任务已启动。
执行后端查询数据库并生成大型 CSV 文件。
反馈用户实时看到进度条从 0 % 移动到 100 %,完成后出现下载链接。

没有 WebSocket 时,前端只能盲目轮询服务器。
使用 WebSocket 时,后台工作进程在状态变化时主动发送事件(progress: 10 %progress: 20 %),从而减少网络开销并为应用提供“实时”的体验。

进程分离

  1. Web 服务器 (Flask + Gunicorn + Eventlet) – 管理与浏览器的活跃 WebSocket 连接,并持有打开的 TCP 套接字的文件描述符。
  2. Worker (Celery) – 完全独立的操作系统进程,可能运行在不同的服务器上。它 无法访问 Web 服务器持有的 WebSocket 连接,因此不能直接“与”用户的浏览器通信。

如果你在 Celery 任务中导入主 Flask 应用中的全局 socketio 实例并调用 .emit(),它要么会静默失败,要么会抛出错误。Celery worker 运行在自己的内存空间中,无法感知连接到 Web 服务器的客户端。

桥梁:消息中间件

我们使用中间消息中间件(通常是 RedisRabbitMQ)作为共享事件总线。

# Flask‑SocketIO external emitter
SocketIO(message_queue='redis://localhost:6379/0')

Flask‑SocketIO 的 External Emitters 解决了隔离问题。通过在 SocketIO 类中配置 message_queue 参数,我们创建了一个 只写 客户端,它连接到中间件而不是保持客户端连接。

  • 当 Celery worker 调用 emit() 时,库会将事件序列化并发布到 Redis 的 Pub/Sub 通道(例如 flask-socketio)。
  • 订阅该通道的 Web 服务器进程会收到消息,解码后确定目标客户端,并通过实际的 WebSocket 连接转发负载。

因此,任何共享同一 Redis 后端的进程都可以向网页客户端发送消息。

常见陷阱:“在应用上下文之外工作”

RuntimeError: Working outside of application context.

出现此错误是因为 Flask 扩展期望有一个活动的 Flask 应用(或请求)上下文。

错误原因 – 在 tasks.py 中导入绑定到 Flask 应用的 socketio 对象并在 Celery worker 中使用它。worker 没有运行 Flask 应用,因此上下文绑定的全局变量失效。

解决方案 – 在 Celery 模块内部实例化一个 独立的 SocketIO 对象。该对象 不需要 Flask 应用,只需提供消息队列的连接字符串。使用独立的 emitter 可以完全绕过在 worker 中需要 Flask 应用上下文的问题,使后台任务更简洁、更可靠。

生产‑就绪模式

下面是一个最小的、可用于生产环境的示例,演示了如何将 Celery 与 Flask‑SocketIO 集成。

app.py – Web 服务器(Flask + SocketIO)

from flask import Flask, request
from flask_socketio import SocketIO
from celery_worker import make_celery

app = Flask(__name__)

# Configuration
app.config['SECRET_KEY'] = 'secret!'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

# Initialise SocketIO with a message queue (Redis)
socketio = SocketIO(app, message_queue=app.config['CELERY_BROKER_URL'])

# Initialise Celery
celery = make_celery(app)


@app.route('/start-task', methods=['POST'])
def start_task():
    """
    Starts a long‑running background task.
    The client’s SocketIO session ID (sid) is passed to the task
    so we can target the correct browser later.
    """
    data = request.json.get('data')
    sid = request.sid  # Flask‑SocketIO injects this attribute
    task = celery.send_task('tasks.long_running_task', args=[data, sid])
    return {'task_id': task.id}, 202


if __name__ == '__main__':
    # `socketio.run` starts the Flask app with the appropriate async mode
    socketio.run(app, debug=True)

celery_worker.py – Celery 工厂

from celery import Celery

def make_celery(app):
    """
    Creates a Celery instance that uses the Flask app’s configuration.
    """
    celery = Celery(
        app.import_name,
        broker=app.config['CELERY_BROKER_URL'],
        backend=app.config['CELERY_RESULT_BACKEND']
    )
    celery.conf.update(app.config)

    # Optional: make Flask app context available inside tasks
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

tasks.py – Celery 工作进程(独立的 SocketIO 发射器)

import time
from celery import Celery
from flask_socketio import SocketIO

# ----------------------------------------------------------------------
# 1️⃣  Message‑queue URL (Redis)
# ----------------------------------------------------------------------
REDIS_URL = 'redis://localhost:6379/0'

# ----------------------------------------------------------------------
# 2️⃣  Celery instance (standard configuration)
# ----------------------------------------------------------------------
celery = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)

# ----------------------------------------------------------------------
# 3️⃣  Stand‑alone SocketIO emitter
# ----------------------------------------------------------------------
# No Flask app is passed – this is an *external* emitter that writes
# to the Redis message queue.
socketio_emitter = SocketIO(message_queue=REDIS_URL)


@celery.task(name='tasks.long_running_task')
def long_running_task(payload, sid):
    """
    Simulates a long‑running job and pushes progress updates
    to the client identified by `sid`.
    """
    total_steps = 5
    for step in range(1, total_steps + 1):
        # Simulate work
        time.sleep(2)

        # Compute progress percentage
        progress = int(step / total_steps * 100)

        # Emit progress to the specific client
        socketio_emitter.emit(
            'task_progress',
            {'progress': progress, 'task_id': long_running_task.request.id},
            room=sid  # Target the correct browser session
        )

    # Final notification – task completed
    socketio_emitter.emit(
        'task_complete',
        {'message': 'Export ready', 'task_id': long_running_task.request.id},
        room=sid
    )

回顾

您需要的内容为什么重要
在工作进程中使用独立的 SocketIO 发射器避免 “在应用上下文之外工作” 的错误。
使用 Redis(或 RabbitMQ)作为消息队列为 Web 服务器和工作进程提供共享的 Pub/Sub 通道。
将客户端的 sid 传递给任务使工作进程在发射时能够针对特定的浏览器会话。
使用 socketio.emit(..., room=sid)只将更新发送给目标客户端,而不是所有人。

通过遵循上述模式,您将获得:

  • 实时进度反馈,无需轮询。
  • 可扩展的架构 —— 工作进程可以运行在不同机器上。
  • 职责清晰分离 —— Web 服务器处理连接,工作进程处理繁重任务。

编码愉快! 🚀

Celery 任务示例

import time
from celery import Celery
from flask_socketio import SocketIO

REDIS_URL = 'redis://localhost:6379/0'

celery = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)
socketio_emitter = SocketIO(message_queue=REDIS_URL)


@celery.task(name='tasks.long_running_task')
def long_running_task(data, user_sid):
    """
    Background task that updates the user on progress.
    """
    total_steps = 5
    for i in range(total_steps):
        time.sleep(1)  # Simulate work
        progress = int((i + 1) / total_steps * 100)

        # Emit to the specific user's room (their session ID)
        socketio_emitter.emit(
            'progress',
            {'percent': progress, 'status': 'Processing...'},
            room=user_sid,
        )

    # Final completion event
    socketio_emitter.emit('completion', {'result': 'Done!'}, room=user_sid)
    return "OK"

架构考虑

虽然此架构功能强大,但也会带来必须管理的复杂性。

延迟

  • 每个由 Celery 发出的事件都会先经过一次网络跳转到 Redis,随后被处理、被 Web 服务器接收,最后发送给客户端。
  • 典型延迟为 低于 10 ms,高于直接内存内发射的情况。

投递保证

  • Redis Pub/Sub 是 一次性发送(fire‑and‑forget)。
  • 如果 Web 服务器在 Celery 工作进程发出消息的瞬间重启或暂时失去与 Redis 的连接,则该更新将会丢失。
  • 对于关键通知,建议实现 持久化收件箱确认机制

依赖管理

  • Web 服务器和 Celery 工作进程必须使用兼容版本的 Flask‑SocketIORedis
  • 序列化协议不匹配可能导致消息被静默忽略。

扩展工作进程

  • 该架构能够轻松水平扩展。
  • 只要所有工作进程指向相同的消息队列 Redis URL,添加 100 个 Celery 工作进程就非常直接;它们都可以无缝地向客户端推送更新。

图片库

(在此插入图片)


桥接 Celery 与 Flask‑SocketIO

桥接 Celery 与 Flask‑SocketIO 使您能够构建响应迅速、专业级的应用程序,在处理繁重任务时不会让用户感到困惑。关键在于理解 worker(工作进程)和 web server(Web 服务器)是独立的实体,必须通过一个中立的第三方——Redis 消息队列进行通信。

生产就绪检查清单

  • Redis 配置

    • 确保在 Flask 应用和独立的 Celery SocketIO 实例中,Redis 被配置为 message_queue
  • 用户定位

    • request.sid(或专用的房间名称)传递给 Celery 任务,以定位特定用户。
  • SocketIO 初始化

    • 不要在 Celery 工作进程中将 Flask 应用对象传递给 SocketIO 构造函数。
  • 猴子补丁(如适用)

    • 在 Web 服务器的入口点进行猴子补丁(例如针对 Gevent/Eventlet)。
Back to Blog

相关文章

阅读更多 »