重放模型:AWS Lambda Durable Functions 的实际工作原理
Source: Dev.to
核心原理
你的处理函数在每次调用时都会从头重新执行,但已完成的操作会从检查点返回缓存结果,而不是重新执行。
async function processOrder(event: any, ctx: DurableContext) {
const order = await ctx.step('create-order', async () => {
console.log('Creating order...');
return { orderId: '123', total: 50 };
});
const payment = await ctx.step('process-payment', async () => {
console.log('Processing payment...');
return { transactionId: 'txn-456', status: 'success' };
});
await ctx.wait({ seconds: 300 }); // Wait 5 minutes
const notification = await ctx.step('send-notification', async () => {
console.log('Sending notification...');
return { sent: true };
});
return { order, payment, notification };
}
调用流程
调用 1 (t = 0 s)
Creating order...
Processing payment...
[Checkpoint: create-order completed]
[Checkpoint: process-payment completed]
[Function terminates – waiting 5 minutes]
调用 2 (t = 300 s, 等待完成后)
[REPLAY MODE: Skipping create-order – returning cached result]
[REPLAY MODE: Skipping process-payment – returning cached result]
[EXECUTION MODE: Running send-notification]
Sending notification...
[Checkpoint: send-notification completed]
[Function completes]
函数每次都会从头开始,但已完成的步骤会被跳过,所以日志只会出现一次。
执行模式:秘密调味料
SDK 以两种自动模式运行:
- ExecutionMode – 第一次执行操作;结果会保存到检查点,日志会输出,副作用会发生。
- ReplayMode – 重放已完成的操作;立即返回缓存结果,日志被抑制,且不会产生副作用。
当 SDK 遇到尚未完成的操作时,会从 ReplayMode 切换到 ExecutionMode。
检查点的工作原理
每个操作都会创建一个检查点,存储元数据和结果:
{
"operationId": "2",
"operationType": "STEP",
"operationName": "process-payment",
"status": "SUCCEEDED",
"result": {
"transactionId": "txn-456",
"status": "success"
}
}
函数重新启动时,SDK 会加载所有检查点,以操作 ID 为索引,对已完成的操作返回缓存结果,对新操作正常执行。
确定性要求
重放要求你的代码是 确定性的——每次调用必须以相同的顺序执行相同的操作序列。
什么会破坏确定性
// ❌ 随机控制流
if (Math.random() > 0.5) {
await ctx.step('optional-step', async () => doSomething());
}
// ❌ 基于时间的分支
const isWeekend = new Date().getDay() >= 5;
if (isWeekend) {
await ctx.step('weekend-task', async () => doWeekendWork());
}
// ❌ 外部可变状态
let counter = 0;
await ctx.step('step1', async () => {
counter++; // 重放时不会递增!
return counter;
});
这些模式会导致运行之间的操作序列不匹配,从而触发重放一致性违规。
如何编写确定性代码
在步骤内部捕获所有非确定性值:
// ✅ 在步骤中捕获随机值
const randomId = await ctx.step('generate-id', async () => {
return crypto.randomUUID(); // 只执行一次,重放时使用缓存
});
// ✅ 在步骤中捕获时间戳
const timestamp = await ctx.step('get-timestamp', async () => {
return Date.now(); // 每次重放得到相同的时间戳
});
// ✅ 使用事件数据进行控制流(确定性)
if (event.shouldProcess) {
await ctx.step('process', async () => doWork());
}
// ✅ 在步骤中捕获基于时间的决策
const isWeekend = await ctx.step('check-day', async () => {
return new Date().getDay() >= 5;
});
if (isWeekend) {
await ctx.step('weekend-task', async () => doWeekendWork());
}
重放一致性校验
SDK 会验证每次调用是否遵循相同的操作序列:
- 操作类型(STEP、WAIT、INVOKE)
- 操作名称(你的标识符)
- 操作位置(顺序)
示例校验错误:
Replay consistency violation: Expected operation 'process-payment'
of type STEP at position 2, but found operation 'send-email' of type STEP
这种提前检测帮助你发现非确定性 bug。
完整示例:带重放的订单处理
async function processOrder(event: any, ctx: DurableContext) {
ctx.logger.info('Order processing started', { orderId: event.orderId });
// Step 1: Validate inventory
const inventory = await ctx.step('check-inventory', async () => {
ctx.logger.info('Checking inventory');
const response = await fetch(`https://api.inventory.com/check`, {
method: 'POST',
body: JSON.stringify({ items: event.items })
});
return response.json();
});
if (!inventory.available) {
ctx.logger.warn('Out of stock', { missing: inventory.missing });
return { status: 'out-of-stock' };
}
// Step 2: Process payment
const payment = await ctx.step('process-payment', async () => {
ctx.logger.info('Processing payment', { amount: inventory.total });
const response = await fetch(`https://api.payments.com/charge`, {
method: 'POST',
body: JSON.stringify({
customerId: event.customerId,
amount: inventory.total
})
});
return response.json();
});
// Step 3: Wait for warehouse confirmation (5‑minute timeout)
ctx.logger.info('Waiting for warehouse confirmation');
const confirmation = await ctx.waitForCallback(
'warehouse-confirm',
async (callbackId) => {
// Send callback ID to warehouse system
await fetch(`https://api.warehouse.com/notify`, {
method: 'POST',
body: JSON.stringify({ orderId: event.orderId, callbackId })
});
},
{ timeout: { seconds: 300 } }
);
// Step 4: Send notification
const notification = await ctx.step('send-notification', async () => {
ctx.logger.info('Sending notification');
// e.g., call an email service
await fetch(`https://api.email.com/send`, {
method: 'POST',
body: JSON.stringify({
to: event.customerEmail,
subject: 'Your order is confirmed',
body: `Order ${event.orderId} is confirmed.`
})
});
return { sent: true };
});
return { inventory, payment, confirmation, notification };
}
在此工作流中,每一步都会被检查点记录。如果函数因等待(例如等待仓库回调)而暂停,后续调用会瞬间重放之前的步骤,确保副作用仅执行一次。