使用 Celery 和 Redis 进行扩展
Source: Dev.to
同步索引的问题
当用户开始索引 YouTube 播放列表时,后端运行了一个 同步循环,逐个处理每个视频。
获取并索引转录本大约需要 1.5 秒,因此包含 1,000 个视频的播放列表大约需要 25 分钟。浏览器在约 60 秒后超时,UI 卡死,应用最终崩溃。
为什么使用任务队列和后台工作者?
完全同步的设计会阻塞主请求‑响应循环:
- 长时间运行的操作会占用 Web 服务器线程。
- 请求堆积,延迟增加,系统可能会掉线或崩溃。
任务队列将工作从请求路径中移出:
- HTTP 处理器记录意图并 将任务放入队列。
- 它立即向客户端返回确认。
- 独立的 工作进程 从队列中消费任务并并行(或并发)执行它们。
好处
- Web 层保持响应。
- 只需增加更多工作者即可实现扩展。
- 工作者的失败不会蔓延到整个系统。
- 可以在队列层面进行限流(有助于避免 YouTube IP 封禁)。
核心概念
任务
单个工作单元。在本项目中,任务是 process_video_task,它:
- 接收视频 ID。
- 通过代理获取转录本。
- 将数据索引到 Elasticsearch。
生产者
创建任务的应用部分。这里,Flask 端点收集播放列表信息并 将作业发送到 Celery,即时卸载工作。
中间件(Broker)
存储任务并在生产者与消费者之间进行调解的软件。
Redis 同时用作消息中间件和任务状态存储,实现实时进度更新。
消费者(工作者)
从中间件拉取任务并执行的进程。Celery 管理工作者进程、内存、确认和重试。
实现细节
Celery 任务(Python)
# tasks.py
from celery import Celery
app = Celery('youtube_indexer', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def process_video_task(self, video_data, index_name):
try:
transcript = get_video_transcript(video_data['id'])
if index_video(index_name, video_data, transcript):
return (video_data['id'], True)
except Exception as e:
# Celery will handle retries based on max_retries
raise self.retry(exc=e, countdown=60)
将任务 ID 存入 Redis
# utils.py
TASK_KEY_PREFIX = "playlist_task:"
task_id_key = f"{TASK_KEY_PREFIX}{playlist_id}"
redis_conn.set(task_id_key, task.id, ex=7200) # expires in 2 hours
生产者端点(Flask)
# routes.py
from flask import Blueprint, request, jsonify
from .tasks import process_video_task
bp = Blueprint('index', __name__)
@bp.post('/index_playlist')
def index_playlist():
data = request.json
playlist_id = data['playlist_id']
videos = get_videos_from_playlist(playlist_id)
for video in videos:
process_video_task.delay(video, index_name='my_index')
return jsonify({"status": "queued", "playlist_id": playlist_id})
其他语言生态系统
| 语言 | 队列库 | 中间件 |
|---|---|---|
| Node.js | BullMQ | Redis |
| Go | Asynq | Redis |
| Python | Celery | Redis(或 RabbitMQ 等) |
在不同生态系统中,角色(生产者、Broker、消费者)保持不变。
架构模式:Fan‑Out(编排器)
系统遵循 Fan‑Out / Orchestrator 模式:
- 管理器(Flask 端点)创建一个编排任务,例如
index_playlist_task。 - 该任务 分发 多个
process_video_task作业——每个视频一个。 - 工作者 fan out 并发处理视频。
这种分离使初始请求保持轻量,同时利用并行性完成繁重工作。
经验教训
- 将长时间运行的工作卸载到任务队列可以防止请求超时并保持 UI 响应。
- Redis 提供快速的内存中间件;Celery 增加了重试、监控和工作者管理所需的编排层。
- 相同的原则适用于各种语言——只需替换队列库,保持生产者‑Broker‑消费者模型不变。
通过围绕 Celery 和 Redis 重构应用,索引大型播放列表变得可靠、可扩展且对用户友好。