从 Celery/Redis 到 Temporal:迈向幂等性和可靠工作流的旅程
Source: Dev.to
在分布式系统中处理异步任务时,Celery 与 Redis 的组合通常是首选。我也因为熟悉而在最初的 KYC(了解你的客户)编排器设计中选择了 Celery。然而,随着服务复杂度的提升,我遇到了一个巨大的障碍:保证幂等性以及管理复杂状态。
虽然 Celery 非常适合“发起即忘”的任务,但在网络故障或 worker 宕机导致重试时,重复执行的风险很高。对于 GPU 密集型的人脸识别任务,重复执行代价高昂,且会影响性能。
KYC流程
- 用户上传身份证图片。
- 用户上传自拍视频。
- 在两个文件都存在后比较人脸相似度。
在 Celery 环境中,由于我不知道图像和视频何时会被上传,我需要每次查询数据库或在 Redis 中存储中间状态的复杂逻辑。“所有文件是否已收集?”的检查散布在多个位置,导致维护困难。
为什么选择 Temporal?
Temporal 不仅仅是一个消息队列;它是一个有状态的工作流引擎。工作流代码必须是 可重放安全 的:对于相同的输入和历史记录,它必须产生相同的工作流 API 调用序列。因此,网络 I/O、文件 I/O、系统时间、随机性或线程等副作用必须移到 活动(activities) 中,而工作流本身专注于编排。
官方文档:
FaceSimilarityWorkflow 的核心逻辑
# workflow.py
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy
@workflow.run
async def run(self, data: SimilarityData) -> SimilarityResult:
# Wait up to 1 hour until both image and video are collected
await workflow.wait_condition(
lambda: any(f["type"] == "image" for f in self._files)
and any(f["type"] == "video" for f in self._files),
timeout=timedelta(hours=1),
)
# Execute GPU activity once all files are ready
result = await workflow.execute_activity(
check_face_similarity_activity,
data,
retry_policy=RetryPolicy(maximum_attempts=3),
)
return result
workflow.wait_condition 会在条件满足之前挂起工作流,而不会阻塞事件循环——这在 Celery 中本需要复杂的轮询或 webhook 逻辑才能实现。
活动层面的幂等性
Temporal 将工作流进度记录为事件历史,因此工作者重启时会精确从上一次成功的点继续。活动(Activities)却采用 至少一次 的执行模型:如果工作者在完成工作后但在通知服务器之前崩溃,活动可能会被重试。因此官方文档强烈建议使活动具备幂等性。
官方文档:
双重防御策略
- 外部幂等键 – 将工作流执行 ID 与活动 ID 组合。
- 内部防护 – 在数据库中使用唯一键(或检查已有结果)以防止重复存储/处理。
# activities.py
from temporalio import activity
@activity.defn
async def check_face_similarity_activity(data: SimilarityData) -> SimilarityResult:
info = activity.info()
idempotency_key = f"{info.workflow_run_id}-{info.activity_id}"
session_id = data["session_id"]
with get_db_context() as db:
existing = (
db.query(FaceSimilarity)
.filter(FaceSimilarity.idempotency_key == idempotency_key)
.first()
)
if existing:
return SimilarityResult(success=True, message="Already processed.")
# Perform actual GPU‑intensive work...
# (store result with the same idempotency_key)
对比:Celery/Redis 与 Temporal
| 特性 | Celery/Redis | Temporal |
|---|---|---|
| 状态管理 | 手动存储在 DB/Redis 中 | 由引擎自动管理 |
| 重试策略 | 手动指数回退 | 声明式重试策略 |
| 可见性 | 必须通过日志挖掘 | 在 Temporal UI 中检查历史 |
| 幂等性 | 很难保证 | 结构上可实现 |
要点
从 Celery 迁移到 Temporal 不仅仅是更换工具;它意味着重新定义业务流程在代码中的表达方式。在对幂等性要求极高的金融和认证系统中,Temporal 提供了无可替代的稳定性。
如果你因复杂的异步逻辑和幂等性问题而辗转难眠,我强烈建议迁移到 Temporal。