重放模型:AWS Lambda Durable Functions 的实际工作原理

发布: (2025年12月3日 GMT+8 08:27)
6 min read
原文: Dev.to

Source: Dev.to

核心原理

你的处理函数在每次调用时都会从头重新执行,但已完成的操作会从检查点返回缓存结果,而不是重新执行。

async function processOrder(event: any, ctx: DurableContext) {
  const order = await ctx.step('create-order', async () => {
    console.log('Creating order...');
    return { orderId: '123', total: 50 };
  });

  const payment = await ctx.step('process-payment', async () => {
    console.log('Processing payment...');
    return { transactionId: 'txn-456', status: 'success' };
  });

  await ctx.wait({ seconds: 300 }); // Wait 5 minutes

  const notification = await ctx.step('send-notification', async () => {
    console.log('Sending notification...');
    return { sent: true };
  });

  return { order, payment, notification };
}

调用流程

调用 1 (t = 0 s)

Creating order...
Processing payment...
[Checkpoint: create-order completed]
[Checkpoint: process-payment completed]
[Function terminates – waiting 5 minutes]

调用 2 (t = 300 s, 等待完成后)

[REPLAY MODE: Skipping create-order – returning cached result]
[REPLAY MODE: Skipping process-payment – returning cached result]
[EXECUTION MODE: Running send-notification]
Sending notification...
[Checkpoint: send-notification completed]
[Function completes]

函数每次都会从头开始,但已完成的步骤会被跳过,所以日志只会出现一次。

执行模式:秘密调味料

SDK 以两种自动模式运行:

  • ExecutionMode – 第一次执行操作;结果会保存到检查点,日志会输出,副作用会发生。
  • ReplayMode – 重放已完成的操作;立即返回缓存结果,日志被抑制,且不会产生副作用。

当 SDK 遇到尚未完成的操作时,会从 ReplayMode 切换到 ExecutionMode

检查点的工作原理

每个操作都会创建一个检查点,存储元数据和结果:

{
  "operationId": "2",
  "operationType": "STEP",
  "operationName": "process-payment",
  "status": "SUCCEEDED",
  "result": {
    "transactionId": "txn-456",
    "status": "success"
  }
}

函数重新启动时,SDK 会加载所有检查点,以操作 ID 为索引,对已完成的操作返回缓存结果,对新操作正常执行。

确定性要求

重放要求你的代码是 确定性的——每次调用必须以相同的顺序执行相同的操作序列。

什么会破坏确定性

// ❌ 随机控制流
if (Math.random() > 0.5) {
  await ctx.step('optional-step', async () => doSomething());
}

// ❌ 基于时间的分支
const isWeekend = new Date().getDay() >= 5;
if (isWeekend) {
  await ctx.step('weekend-task', async () => doWeekendWork());
}

// ❌ 外部可变状态
let counter = 0;
await ctx.step('step1', async () => {
  counter++; // 重放时不会递增!
  return counter;
});

这些模式会导致运行之间的操作序列不匹配,从而触发重放一致性违规。

如何编写确定性代码

在步骤内部捕获所有非确定性值:

// ✅ 在步骤中捕获随机值
const randomId = await ctx.step('generate-id', async () => {
  return crypto.randomUUID(); // 只执行一次,重放时使用缓存
});

// ✅ 在步骤中捕获时间戳
const timestamp = await ctx.step('get-timestamp', async () => {
  return Date.now(); // 每次重放得到相同的时间戳
});

// ✅ 使用事件数据进行控制流(确定性)
if (event.shouldProcess) {
  await ctx.step('process', async () => doWork());
}

// ✅ 在步骤中捕获基于时间的决策
const isWeekend = await ctx.step('check-day', async () => {
  return new Date().getDay() >= 5;
});
if (isWeekend) {
  await ctx.step('weekend-task', async () => doWeekendWork());
}

重放一致性校验

SDK 会验证每次调用是否遵循相同的操作序列:

  • 操作类型(STEP、WAIT、INVOKE)
  • 操作名称(你的标识符)
  • 操作位置(顺序)

示例校验错误:

Replay consistency violation: Expected operation 'process-payment' 
of type STEP at position 2, but found operation 'send-email' of type STEP

这种提前检测帮助你发现非确定性 bug。

完整示例:带重放的订单处理

async function processOrder(event: any, ctx: DurableContext) {
  ctx.logger.info('Order processing started', { orderId: event.orderId });

  // Step 1: Validate inventory
  const inventory = await ctx.step('check-inventory', async () => {
    ctx.logger.info('Checking inventory');
    const response = await fetch(`https://api.inventory.com/check`, {
      method: 'POST',
      body: JSON.stringify({ items: event.items })
    });
    return response.json();
  });

  if (!inventory.available) {
    ctx.logger.warn('Out of stock', { missing: inventory.missing });
    return { status: 'out-of-stock' };
  }

  // Step 2: Process payment
  const payment = await ctx.step('process-payment', async () => {
    ctx.logger.info('Processing payment', { amount: inventory.total });
    const response = await fetch(`https://api.payments.com/charge`, {
      method: 'POST',
      body: JSON.stringify({
        customerId: event.customerId,
        amount: inventory.total
      })
    });
    return response.json();
  });

  // Step 3: Wait for warehouse confirmation (5‑minute timeout)
  ctx.logger.info('Waiting for warehouse confirmation');
  const confirmation = await ctx.waitForCallback(
    'warehouse-confirm',
    async (callbackId) => {
      // Send callback ID to warehouse system
      await fetch(`https://api.warehouse.com/notify`, {
        method: 'POST',
        body: JSON.stringify({ orderId: event.orderId, callbackId })
      });
    },
    { timeout: { seconds: 300 } }
  );

  // Step 4: Send notification
  const notification = await ctx.step('send-notification', async () => {
    ctx.logger.info('Sending notification');
    // e.g., call an email service
    await fetch(`https://api.email.com/send`, {
      method: 'POST',
      body: JSON.stringify({
        to: event.customerEmail,
        subject: 'Your order is confirmed',
        body: `Order ${event.orderId} is confirmed.`
      })
    });
    return { sent: true };
  });

  return { inventory, payment, confirmation, notification };
}

在此工作流中,每一步都会被检查点记录。如果函数因等待(例如等待仓库回调)而暂停,后续调用会瞬间重放之前的步骤,确保副作用仅执行一次。

Back to Blog

相关文章

阅读更多 »

开源邮件预热:完整指南

引言 开源电子邮件预热是逐步与邮箱提供商建立信任的过程,使您的邮件进入收件箱,而不是垃圾邮件文件夹....