使用 Celery 和 Redis 进行扩展

发布: (2025年12月2日 GMT+8 17:57)
5 min read
原文: Dev.to

Source: Dev.to

同步索引的问题

当用户开始索引 YouTube 播放列表时,后端运行了一个 同步循环,逐个处理每个视频。
获取并索引转录本大约需要 1.5 秒,因此包含 1,000 个视频的播放列表大约需要 25 分钟。浏览器在约 60 秒后超时,UI 卡死,应用最终崩溃。

为什么使用任务队列和后台工作者?

完全同步的设计会阻塞主请求‑响应循环:

  • 长时间运行的操作会占用 Web 服务器线程。
  • 请求堆积,延迟增加,系统可能会掉线或崩溃。

任务队列将工作从请求路径中移出:

  1. HTTP 处理器记录意图并 将任务放入队列
  2. 它立即向客户端返回确认。
  3. 独立的 工作进程 从队列中消费任务并并行(或并发)执行它们。

好处

  • Web 层保持响应。
  • 只需增加更多工作者即可实现扩展。
  • 工作者的失败不会蔓延到整个系统。
  • 可以在队列层面进行限流(有助于避免 YouTube IP 封禁)。

核心概念

任务

单个工作单元。在本项目中,任务是 process_video_task,它:

  • 接收视频 ID。
  • 通过代理获取转录本。
  • 将数据索引到 Elasticsearch。

生产者

创建任务的应用部分。这里,Flask 端点收集播放列表信息并 将作业发送到 Celery,即时卸载工作。

中间件(Broker)

存储任务并在生产者与消费者之间进行调解的软件。
Redis 同时用作消息中间件和任务状态存储,实现实时进度更新。

消费者(工作者)

从中间件拉取任务并执行的进程。Celery 管理工作者进程、内存、确认和重试。

实现细节

Celery 任务(Python)

# tasks.py
from celery import Celery

app = Celery('youtube_indexer', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3)
def process_video_task(self, video_data, index_name):
    try:
        transcript = get_video_transcript(video_data['id'])
        if index_video(index_name, video_data, transcript):
            return (video_data['id'], True)
    except Exception as e:
        # Celery will handle retries based on max_retries
        raise self.retry(exc=e, countdown=60)

将任务 ID 存入 Redis

# utils.py
TASK_KEY_PREFIX = "playlist_task:"
task_id_key = f"{TASK_KEY_PREFIX}{playlist_id}"
redis_conn.set(task_id_key, task.id, ex=7200)  # expires in 2 hours

生产者端点(Flask)

# routes.py
from flask import Blueprint, request, jsonify
from .tasks import process_video_task

bp = Blueprint('index', __name__)

@bp.post('/index_playlist')
def index_playlist():
    data = request.json
    playlist_id = data['playlist_id']
    videos = get_videos_from_playlist(playlist_id)

    for video in videos:
        process_video_task.delay(video, index_name='my_index')

    return jsonify({"status": "queued", "playlist_id": playlist_id})

其他语言生态系统

语言队列库中间件
Node.jsBullMQRedis
GoAsynqRedis
PythonCeleryRedis(或 RabbitMQ 等)

在不同生态系统中,角色(生产者、Broker、消费者)保持不变。

架构模式:Fan‑Out(编排器)

系统遵循 Fan‑Out / Orchestrator 模式:

  1. 管理器(Flask 端点)创建一个编排任务,例如 index_playlist_task
  2. 该任务 分发 多个 process_video_task 作业——每个视频一个。
  3. 工作者 fan out 并发处理视频。

这种分离使初始请求保持轻量,同时利用并行性完成繁重工作。

经验教训

  • 将长时间运行的工作卸载到任务队列可以防止请求超时并保持 UI 响应。
  • Redis 提供快速的内存中间件;Celery 增加了重试、监控和工作者管理所需的编排层。
  • 相同的原则适用于各种语言——只需替换队列库,保持生产者‑Broker‑消费者模型不变。

通过围绕 Celery 和 Redis 重构应用,索引大型播放列表变得可靠、可扩展且对用户友好。

Back to Blog

相关文章

阅读更多 »