Async Task Integration: Connecting Celery with Flask-SocketIO

Published: (December 18, 2025 at 05:30 PM EST)
7 min read
Source: Dev.to

Source: Dev.to

Introduction

In modern web‑application development, the synchronous request‑response cycle is often a bottleneck. Operations such as generating complex PDF reports, processing video uploads, or training machine‑learning models can take minutes or even hours. Blocking a standard HTTP request for this duration is unacceptable—it degrades user experience, ties up server resources, and often leads to time‑outs from reverse proxies like Nginx.

The standard solution is to offload these heavy operations to a distributed task queue like Celery. This decouples the execution from the web server. However, this decoupling introduces a new challenge: observability. Once a task is sent to the background, the web server loses track of it. How do we tell the user that their report is 50 % complete?

Traditionally, developers used short‑polling (JavaScript requesting /status every second), which floods the server with redundant requests. A far superior architecture utilizes Flask‑SocketIO to push real‑time updates from the background worker directly to the client. This article details the engineering required to bridge the gap between Celery worker processes and the Flask‑SocketIO web server.

Example: “Data Export” feature

StepDescription
TriggerThe user clicks Export Data.
AcknowledgmentThe server immediately returns an HTTP 202 Accepted response, signalling that the task has started.
ExecutionThe backend queries the database and formats a large CSV file.
FeedbackThe user sees a progress bar moving from 0 % to 100 % in real‑time, followed by a download link upon completion.

Without WebSockets the frontend must blindly poll the server.
With WebSockets the background worker proactively emits events (progress: 10 %, progress: 20 %) only when state changes occur. This reduces network overhead and provides a “live” feel to the application.


Process Separation

  1. Web Server (Flask + Gunicorn + Eventlet) – Manages active WebSocket connections with the browser and holds the file descriptors for the open TCP sockets.
  2. Worker (Celery) – A completely separate OS process, potentially on a different server. It has no access to the WebSocket connections held by the Web Server and therefore cannot directly “speak” to the user’s browser.

If you import the global socketio instance from your main Flask app into a Celery task and call .emit(), it will either fail silently or raise errors. The Celery worker lives in its own memory space and does not know about the clients connected to the Web Server.

The Bridge: A Message Broker

We use an intermediary message broker (typically Redis or RabbitMQ) as a shared event bus.

# Flask‑SocketIO external emitter
SocketIO(message_queue='redis://localhost:6379/0')

Flask‑SocketIO’s External Emitters solve the isolation problem. By configuring the SocketIO class with a message_queue argument, we create a write‑only client that connects to the broker instead of holding client connections.

  • When a Celery worker calls emit(), the library serialises the event and publishes it to the Redis Pub/Sub channel (e.g., flask-socketio).
  • The Web Server processes, which are subscribed to this channel, receive the message, decode it, identify the target client(s), and forward the payload over the actual WebSocket connection.

Thus any process that shares the same Redis backend can send messages to web clients.


Common Pitfall: “Working outside of application context”

RuntimeError: Working outside of application context.

This occurs because Flask extensions expect an active Flask application (or request) context.

The mistake – importing the socketio object attached to the Flask app in tasks.py and using it inside a Celery worker. The worker has no Flask app running, so the context‑bound globals fail.

The solution – instantiate a stand‑alone SocketIO object inside the Celery module. This object does not need the Flask app; it only needs the connection string to the message queue. By using a stand‑alone emitter, you bypass the need for a Flask application context entirely within the worker, making background tasks cleaner and more robust.


Production‑Ready Pattern

Below is a minimal, production‑ready example that demonstrates how to integrate Celery with Flask‑SocketIO.

app.py – Web server (Flask + SocketIO)

from flask import Flask, request
from flask_socketio import SocketIO
from celery_worker import make_celery

app = Flask(__name__)

# Configuration
app.config['SECRET_KEY'] = 'secret!'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

# Initialise SocketIO with a message queue (Redis)
socketio = SocketIO(app, message_queue=app.config['CELERY_BROKER_URL'])

# Initialise Celery
celery = make_celery(app)


@app.route('/start-task', methods=['POST'])
def start_task():
    """
    Starts a long‑running background task.
    The client’s SocketIO session ID (sid) is passed to the task
    so we can target the correct browser later.
    """
    data = request.json.get('data')
    sid = request.sid  # Flask‑SocketIO injects this attribute
    task = celery.send_task('tasks.long_running_task', args=[data, sid])
    return {'task_id': task.id}, 202


if __name__ == '__main__':
    # `socketio.run` starts the Flask app with the appropriate async mode
    socketio.run(app, debug=True)

celery_worker.py – Celery factory

from celery import Celery

def make_celery(app):
    """
    Creates a Celery instance that uses the Flask app’s configuration.
    """
    celery = Celery(
        app.import_name,
        broker=app.config['CELERY_BROKER_URL'],
        backend=app.config['CELERY_RESULT_BACKEND']
    )
    celery.conf.update(app.config)

    # Optional: make Flask app context available inside tasks
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

tasks.py – Celery worker (stand‑alone SocketIO emitter)

import time
from celery import Celery
from flask_socketio import SocketIO

# ----------------------------------------------------------------------
# 1️⃣  Message‑queue URL (Redis)
# ----------------------------------------------------------------------
REDIS_URL = 'redis://localhost:6379/0'

# ----------------------------------------------------------------------
# 2️⃣  Celery instance (standard configuration)
# ----------------------------------------------------------------------
celery = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)

# ----------------------------------------------------------------------
# 3️⃣  Stand‑alone SocketIO emitter
# ----------------------------------------------------------------------
# No Flask app is passed – this is an *external* emitter that writes
# to the Redis message queue.
socketio_emitter = SocketIO(message_queue=REDIS_URL)


@celery.task(name='tasks.long_running_task')
def long_running_task(payload, sid):
    """
    Simulates a long‑running job and pushes progress updates
    to the client identified by `sid`.
    """
    total_steps = 5
    for step in range(1, total_steps + 1):
        # Simulate work
        time.sleep(2)

        # Compute progress percentage
        progress = int(step / total_steps * 100)

        # Emit progress to the specific client
        socketio_emitter.emit(
            'task_progress',
            {'progress': progress, 'task_id': long_running_task.request.id},
            room=sid  # Target the correct browser session
        )

    # Final notification – task completed
    socketio_emitter.emit(
        'task_complete',
        {'message': 'Export ready', 'task_id': long_running_task.request.id},
        room=sid
    )

Recap

What you needWhy it matters
Separate SocketIO emitter in the workerAvoids “working outside of application context” errors.
Redis (or RabbitMQ) as a message queueProvides a shared Pub/Sub channel for the web server and workers.
Pass the client’s sid to the taskAllows the worker to target a specific browser session when emitting.
Use socketio.emit(..., room=sid)Sends the update only to the intended client, not to everyone.

By following the pattern above, you get:

  • Real‑time progress feedback without polling.
  • Scalable architecture – workers can run on separate machines.
  • Clean separation of concerns – the web server handles connections; the worker handles heavy lifting.

Happy coding! 🚀


Celery Task Example

import time
from celery import Celery
from flask_socketio import SocketIO

REDIS_URL = 'redis://localhost:6379/0'

celery = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)
socketio_emitter = SocketIO(message_queue=REDIS_URL)


@celery.task(name='tasks.long_running_task')
def long_running_task(data, user_sid):
    """
    Background task that updates the user on progress.
    """
    total_steps = 5
    for i in range(total_steps):
        time.sleep(1)  # Simulate work
        progress = int((i + 1) / total_steps * 100)

        # Emit to the specific user's room (their session ID)
        socketio_emitter.emit(
            'progress',
            {'percent': progress, 'status': 'Processing...'},
            room=user_sid,
        )

    # Final completion event
    socketio_emitter.emit('completion', {'result': 'Done!'}, room=user_sid)
    return "OK"

Architectural Considerations

While this architecture is powerful, it introduces complexity that must be managed.

Latency

  • Every event emitted from Celery makes a network hop to Redis, is processed, picked up by the web server, and then sent to the client.
  • Typical latency is under 10 ms, which is higher than a direct in‑memory emit.

Delivery Guarantees

  • Redis Pub/Sub is fire‑and‑forget.
  • If the web server restarts or temporarily loses connection to Redis exactly when the Celery worker emits a message, that update will be lost.
  • For critical notifications, consider implementing a persistent inbox or an acknowledgment system.

Dependency Management

  • Both the web server and the Celery worker must rely on compatible versions of Flask‑SocketIO and Redis.
  • A mismatch in serialization protocols can cause messages to be silently ignored.

Scaling Workers

  • The architecture scales horizontally with ease.
  • Adding 100 Celery workers is straightforward as long as they all point to the same message‑queue Redis URL; they can all push updates to clients seamlessly.

(Insert images here)


Bridging Celery and Flask‑SocketIO

Bridging Celery and Flask‑SocketIO allows you to build responsive, professional‑grade applications that handle heavy lifting without leaving the user in the dark. The key lies in understanding that the worker and the web server are distinct entities that must communicate via a neutral third party—the Redis message queue.


Production‑Readiness Checklist

  • Redis Configuration

    • Ensure Redis is configured as the message_queue on both the Flask app and the standalone Celery SocketIO instance.
  • User Targeting

    • Pass request.sid (or a dedicated room name) to the Celery task to target specific users.
  • SocketIO Initialization

    • Do not pass the Flask app object to the SocketIO constructor in the Celery worker.
  • Monkey‑Patching (if applicable)

    • Apply monkey‑patching (e.g., for Gevent/Eventlet) at the entry point of your web server.
Back to Blog

Related posts

Read more »

Scaling with Celery and Redis

The Problem with Synchronous Indexing When a user started indexing a YouTube playlist, the backend ran a synchronous loop that processed each video one after a...