Async Task Integration: Connecting Celery with Flask-SocketIO
Source: Dev.to
Introduction
In modern web‑application development, the synchronous request‑response cycle is often a bottleneck. Operations such as generating complex PDF reports, processing video uploads, or training machine‑learning models can take minutes or even hours. Blocking a standard HTTP request for this duration is unacceptable—it degrades user experience, ties up server resources, and often leads to time‑outs from reverse proxies like Nginx.
The standard solution is to offload these heavy operations to a distributed task queue like Celery. This decouples the execution from the web server. However, this decoupling introduces a new challenge: observability. Once a task is sent to the background, the web server loses track of it. How do we tell the user that their report is 50 % complete?
Traditionally, developers used short‑polling (JavaScript requesting /status every second), which floods the server with redundant requests. A far superior architecture utilizes Flask‑SocketIO to push real‑time updates from the background worker directly to the client. This article details the engineering required to bridge the gap between Celery worker processes and the Flask‑SocketIO web server.
Example: “Data Export” feature
| Step | Description |
|---|---|
| Trigger | The user clicks Export Data. |
| Acknowledgment | The server immediately returns an HTTP 202 Accepted response, signalling that the task has started. |
| Execution | The backend queries the database and formats a large CSV file. |
| Feedback | The user sees a progress bar moving from 0 % to 100 % in real‑time, followed by a download link upon completion. |
Without WebSockets the frontend must blindly poll the server.
With WebSockets the background worker proactively emits events (progress: 10 %, progress: 20 %) only when state changes occur. This reduces network overhead and provides a “live” feel to the application.
Process Separation
- Web Server (Flask + Gunicorn + Eventlet) – Manages active WebSocket connections with the browser and holds the file descriptors for the open TCP sockets.
- Worker (Celery) – A completely separate OS process, potentially on a different server. It has no access to the WebSocket connections held by the Web Server and therefore cannot directly “speak” to the user’s browser.
If you import the global socketio instance from your main Flask app into a Celery task and call .emit(), it will either fail silently or raise errors. The Celery worker lives in its own memory space and does not know about the clients connected to the Web Server.
The Bridge: A Message Broker
We use an intermediary message broker (typically Redis or RabbitMQ) as a shared event bus.
# Flask‑SocketIO external emitter
SocketIO(message_queue='redis://localhost:6379/0')
Flask‑SocketIO’s External Emitters solve the isolation problem. By configuring the SocketIO class with a message_queue argument, we create a write‑only client that connects to the broker instead of holding client connections.
- When a Celery worker calls
emit(), the library serialises the event and publishes it to the Redis Pub/Sub channel (e.g.,flask-socketio). - The Web Server processes, which are subscribed to this channel, receive the message, decode it, identify the target client(s), and forward the payload over the actual WebSocket connection.
Thus any process that shares the same Redis backend can send messages to web clients.
Common Pitfall: “Working outside of application context”
RuntimeError: Working outside of application context.
This occurs because Flask extensions expect an active Flask application (or request) context.
The mistake – importing the socketio object attached to the Flask app in tasks.py and using it inside a Celery worker. The worker has no Flask app running, so the context‑bound globals fail.
The solution – instantiate a stand‑alone SocketIO object inside the Celery module. This object does not need the Flask app; it only needs the connection string to the message queue. By using a stand‑alone emitter, you bypass the need for a Flask application context entirely within the worker, making background tasks cleaner and more robust.
Production‑Ready Pattern
Below is a minimal, production‑ready example that demonstrates how to integrate Celery with Flask‑SocketIO.
app.py – Web server (Flask + SocketIO)
from flask import Flask, request
from flask_socketio import SocketIO
from celery_worker import make_celery
app = Flask(__name__)
# Configuration
app.config['SECRET_KEY'] = 'secret!'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# Initialise SocketIO with a message queue (Redis)
socketio = SocketIO(app, message_queue=app.config['CELERY_BROKER_URL'])
# Initialise Celery
celery = make_celery(app)
@app.route('/start-task', methods=['POST'])
def start_task():
"""
Starts a long‑running background task.
The client’s SocketIO session ID (sid) is passed to the task
so we can target the correct browser later.
"""
data = request.json.get('data')
sid = request.sid # Flask‑SocketIO injects this attribute
task = celery.send_task('tasks.long_running_task', args=[data, sid])
return {'task_id': task.id}, 202
if __name__ == '__main__':
# `socketio.run` starts the Flask app with the appropriate async mode
socketio.run(app, debug=True)
celery_worker.py – Celery factory
from celery import Celery
def make_celery(app):
"""
Creates a Celery instance that uses the Flask app’s configuration.
"""
celery = Celery(
app.import_name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)
# Optional: make Flask app context available inside tasks
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
tasks.py – Celery worker (stand‑alone SocketIO emitter)
import time
from celery import Celery
from flask_socketio import SocketIO
# ----------------------------------------------------------------------
# 1️⃣ Message‑queue URL (Redis)
# ----------------------------------------------------------------------
REDIS_URL = 'redis://localhost:6379/0'
# ----------------------------------------------------------------------
# 2️⃣ Celery instance (standard configuration)
# ----------------------------------------------------------------------
celery = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)
# ----------------------------------------------------------------------
# 3️⃣ Stand‑alone SocketIO emitter
# ----------------------------------------------------------------------
# No Flask app is passed – this is an *external* emitter that writes
# to the Redis message queue.
socketio_emitter = SocketIO(message_queue=REDIS_URL)
@celery.task(name='tasks.long_running_task')
def long_running_task(payload, sid):
"""
Simulates a long‑running job and pushes progress updates
to the client identified by `sid`.
"""
total_steps = 5
for step in range(1, total_steps + 1):
# Simulate work
time.sleep(2)
# Compute progress percentage
progress = int(step / total_steps * 100)
# Emit progress to the specific client
socketio_emitter.emit(
'task_progress',
{'progress': progress, 'task_id': long_running_task.request.id},
room=sid # Target the correct browser session
)
# Final notification – task completed
socketio_emitter.emit(
'task_complete',
{'message': 'Export ready', 'task_id': long_running_task.request.id},
room=sid
)
Recap
| What you need | Why it matters |
|---|---|
| Separate SocketIO emitter in the worker | Avoids “working outside of application context” errors. |
| Redis (or RabbitMQ) as a message queue | Provides a shared Pub/Sub channel for the web server and workers. |
Pass the client’s sid to the task | Allows the worker to target a specific browser session when emitting. |
Use socketio.emit(..., room=sid) | Sends the update only to the intended client, not to everyone. |
By following the pattern above, you get:
- Real‑time progress feedback without polling.
- Scalable architecture – workers can run on separate machines.
- Clean separation of concerns – the web server handles connections; the worker handles heavy lifting.
Happy coding! 🚀
Celery Task Example
import time
from celery import Celery
from flask_socketio import SocketIO
REDIS_URL = 'redis://localhost:6379/0'
celery = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)
socketio_emitter = SocketIO(message_queue=REDIS_URL)
@celery.task(name='tasks.long_running_task')
def long_running_task(data, user_sid):
"""
Background task that updates the user on progress.
"""
total_steps = 5
for i in range(total_steps):
time.sleep(1) # Simulate work
progress = int((i + 1) / total_steps * 100)
# Emit to the specific user's room (their session ID)
socketio_emitter.emit(
'progress',
{'percent': progress, 'status': 'Processing...'},
room=user_sid,
)
# Final completion event
socketio_emitter.emit('completion', {'result': 'Done!'}, room=user_sid)
return "OK"
Architectural Considerations
While this architecture is powerful, it introduces complexity that must be managed.
Latency
- Every event emitted from Celery makes a network hop to Redis, is processed, picked up by the web server, and then sent to the client.
- Typical latency is under 10 ms, which is higher than a direct in‑memory emit.
Delivery Guarantees
- Redis Pub/Sub is fire‑and‑forget.
- If the web server restarts or temporarily loses connection to Redis exactly when the Celery worker emits a message, that update will be lost.
- For critical notifications, consider implementing a persistent inbox or an acknowledgment system.
Dependency Management
- Both the web server and the Celery worker must rely on compatible versions of Flask‑SocketIO and Redis.
- A mismatch in serialization protocols can cause messages to be silently ignored.
Scaling Workers
- The architecture scales horizontally with ease.
- Adding 100 Celery workers is straightforward as long as they all point to the same message‑queue Redis URL; they can all push updates to clients seamlessly.
Image Gallery
(Insert images here)
Bridging Celery and Flask‑SocketIO
Bridging Celery and Flask‑SocketIO allows you to build responsive, professional‑grade applications that handle heavy lifting without leaving the user in the dark. The key lies in understanding that the worker and the web server are distinct entities that must communicate via a neutral third party—the Redis message queue.
Production‑Readiness Checklist
-
Redis Configuration
- Ensure Redis is configured as the
message_queueon both the Flask app and the standalone Celery SocketIO instance.
- Ensure Redis is configured as the
-
User Targeting
- Pass
request.sid(or a dedicated room name) to the Celery task to target specific users.
- Pass
-
SocketIO Initialization
- Do not pass the Flask app object to the SocketIO constructor in the Celery worker.
-
Monkey‑Patching (if applicable)
- Apply monkey‑patching (e.g., for Gevent/Eventlet) at the entry point of your web server.