从 Celery/Redis 到 Temporal:迈向幂等性和可靠工作流的旅程

发布: (2026年1月8日 GMT+8 19:49)
5 分钟阅读
原文: Dev.to

Source: Dev.to

在分布式系统中处理异步任务时,Celery 与 Redis 的组合通常是首选。我也因为熟悉而在最初的 KYC(了解你的客户)编排器设计中选择了 Celery。然而,随着服务复杂度的提升,我遇到了一个巨大的障碍:保证幂等性以及管理复杂状态。

虽然 Celery 非常适合“发起即忘”的任务,但在网络故障或 worker 宕机导致重试时,重复执行的风险很高。对于 GPU 密集型的人脸识别任务,重复执行代价高昂,且会影响性能。

KYC流程

  1. 用户上传身份证图片。
  2. 用户上传自拍视频。
  3. 在两个文件都存在后比较人脸相似度。

在 Celery 环境中,由于我不知道图像和视频何时会被上传,我需要每次查询数据库或在 Redis 中存储中间状态的复杂逻辑。“所有文件是否已收集?”的检查散布在多个位置,导致维护困难。

为什么选择 Temporal?

Temporal 不仅仅是一个消息队列;它是一个有状态的工作流引擎。工作流代码必须是 可重放安全 的:对于相同的输入和历史记录,它必须产生相同的工作流 API 调用序列。因此,网络 I/O、文件 I/O、系统时间、随机性或线程等副作用必须移到 活动(activities) 中,而工作流本身专注于编排。

官方文档:

FaceSimilarityWorkflow 的核心逻辑

# workflow.py
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy

@workflow.run
async def run(self, data: SimilarityData) -> SimilarityResult:
    # Wait up to 1 hour until both image and video are collected
    await workflow.wait_condition(
        lambda: any(f["type"] == "image" for f in self._files)
        and any(f["type"] == "video" for f in self._files),
        timeout=timedelta(hours=1),
    )

    # Execute GPU activity once all files are ready
    result = await workflow.execute_activity(
        check_face_similarity_activity,
        data,
        retry_policy=RetryPolicy(maximum_attempts=3),
    )
    return result

workflow.wait_condition 会在条件满足之前挂起工作流,而不会阻塞事件循环——这在 Celery 中本需要复杂的轮询或 webhook 逻辑才能实现。

活动层面的幂等性

Temporal 将工作流进度记录为事件历史,因此工作者重启时会精确从上一次成功的点继续。活动(Activities)却采用 至少一次 的执行模型:如果工作者在完成工作后但在通知服务器之前崩溃,活动可能会被重试。因此官方文档强烈建议使活动具备幂等性。

官方文档:

双重防御策略

  1. 外部幂等键 – 将工作流执行 ID 与活动 ID 组合。
  2. 内部防护 – 在数据库中使用唯一键(或检查已有结果)以防止重复存储/处理。
# activities.py
from temporalio import activity

@activity.defn
async def check_face_similarity_activity(data: SimilarityData) -> SimilarityResult:
    info = activity.info()
    idempotency_key = f"{info.workflow_run_id}-{info.activity_id}"
    session_id = data["session_id"]

    with get_db_context() as db:
        existing = (
            db.query(FaceSimilarity)
            .filter(FaceSimilarity.idempotency_key == idempotency_key)
            .first()
        )
        if existing:
            return SimilarityResult(success=True, message="Already processed.")

    # Perform actual GPU‑intensive work...
    # (store result with the same idempotency_key)

对比:Celery/Redis 与 Temporal

特性Celery/RedisTemporal
状态管理手动存储在 DB/Redis 中由引擎自动管理
重试策略手动指数回退声明式重试策略
可见性必须通过日志挖掘在 Temporal UI 中检查历史
幂等性很难保证结构上可实现

要点

从 Celery 迁移到 Temporal 不仅仅是更换工具;它意味着重新定义业务流程在代码中的表达方式。在对幂等性要求极高的金融和认证系统中,Temporal 提供了无可替代的稳定性。

如果你因复杂的异步逻辑和幂等性问题而辗转难眠,我强烈建议迁移到 Temporal。

Back to Blog

相关文章

阅读更多 »

你好,我是新人。

嗨!我又回到 STEM 的领域了。我也喜欢学习能源系统、科学、技术、工程和数学。其中一个项目是…