写得好:使用 Redis、Lua 和 Go 实现原子性和幂等性

发布: (2025年11月30日 GMT+8 14:53)
9 min read
原文: Dev.to

Source: Dev.to

引言

如果世界里只有单体函数——一次执行的简单函数,崩溃后我们可以再试一次,生活会很轻松。
但我们构建的是分布式系统,事情没有那么简单(虽然很有趣)。

一个经典场景:用户点击 “立即支付”。后端必须:

  1. 从用户钱包中扣除余额(Postgres)。
  2. 发布事件,发送确认邮件并通知仓库(Redis/Kafka)。

如果数据库提交成功但网络在事件发布前失败,用户被扣费但邮件从未发送,仓库也不会发货。反向操作又会导致相反的问题。这就是 双写问题,微服务中数据完整性的隐形杀手。

为了解决它,我们需要两个架构支柱:

  • 原子性 – 确保数据库写入和事件发布一起发生。
  • 幂等性 – 保证重复点击只产生一次扣费。

我们将使用 GoRedisPostgres 构建一个健壮的后端,实现 事务性 Outbox 模式 来保证原子性,并使用 Redis 实现幂等性。

完整、可运行的源码已在 GitHub 上提供:
HERE

事务性 Outbox 模式

我们必须停止在一次请求中把数据库和消息中间件当作独立实体来对待。由于 Redis 不能参与 Postgres 事务,我们把消息队列 带到 数据库中。

事务性 Outbox 模式:不是直接发布到 Redis,而是在同一个修改业务数据的事务里向本地 SQL 表 (outbox) 插入消息。随后后台工作者会把这些消息投递到 Redis。

架构概览

HLD

数据库模式

-- Business Table: stores the actual state
CREATE TABLE orders (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    amount INT NOT NULL,
    status VARCHAR(50) DEFAULT 'PENDING',
    created_at TIMESTAMP DEFAULT NOW()
);

-- Outbox Table: stores the intent to publish
CREATE TABLE outbox (
    id UUID PRIMARY KEY,
    event_type VARCHAR(255) NOT NULL,   -- e.g., "order.created"
    payload JSONB NOT NULL,              -- data to publish
    status VARCHAR(50) DEFAULT 'PENDING',
    created_at TIMESTAMP DEFAULT NOW()
);

Go 实现 – CreateOrder

type Order struct {
    ID     uuid.UUID `json:"id"`
    UserID uuid.UUID `json:"user_id"`
    Amount int       `json:"amount"`
}

func CreateOrder(ctx context.Context, db *sql.DB, order Order) error {
    // 1. Start the transaction (atomicity boundary)
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to begin transaction: %w", err)
    }
    defer tx.Rollback()

    // 2. Insert the business record
    _, err = tx.ExecContext(ctx,
        `INSERT INTO orders (id, user_id, amount) VALUES ($1, $2, $3)`,
        order.ID, order.UserID, order.Amount)
    if err != nil {
        return fmt.Errorf("failed to insert order: %w", err)
    }

    // 3. Insert the outbox record
    payload, err := json.Marshal(order)
    if err != nil {
        return fmt.Errorf("failed to marshal payload: %w", err)
    }

    _, err = tx.ExecContext(ctx,
        `INSERT INTO outbox (id, event_type, payload) VALUES ($1, $2, $3)`,
        uuid.New(), "order.created", payload)
    if err != nil {
        return fmt.Errorf("failed to insert outbox event: %w", err)
    }

    // 4. Commit the transaction
    if err := tx.Commit(); err != nil {
        return fmt.Errorf("failed to commit transaction: %w", err)
    }

    return nil
}

这个函数只操作 Postgres。订单和 outbox 条目一起持久化——要么全部成功,要么全部失败(原子性)。

守护者:实现幂等性

在分布式系统中,网络故障会导致客户端重试请求,进而产生重复处理的风险。我们使用 幂等键(例如在 Idempotency-Key 头部发送的 UUID)并将其存储在 Redis 中,利用其快速原子操作。

幂等键的三种状态

状态含义
Null(新)之前未见过该键 – 加锁并继续。
PENDING另一个请求正在处理该键 – 返回 409 Conflict
JSON Payload请求已完成 – 返回缓存的响应。

为什么使用 Lua?

朴素的检查‑后‑设置方式会产生竞争:

// BAD CODE: DO NOT USE
if redis.Get(key) == "" {
    redis.Set(key, "PENDING")
    // ... process ...
}

两个并发请求可能同时看到键为空并继续执行,导致重复扣费。Redis Lua 脚本以原子方式执行,消除这种竞争。

Lua 脚本 (script.lua)

-- script.lua
local key = KEYS[1]
local pending_status = ARGV[1] -- "PENDING"
local ttl = ARGV[2]            -- expiration in seconds

-- 1. Check if key exists
local value = redis.call("GET", key)

-- 2. If it exists, return the value (could be "PENDING" or final JSON)
if value then
    return value
end

-- 3. If not, lock it with "PENDING" and a TTL
redis.call("SET", key, pending_status, "EX", ttl)
return nil

TTL 充当 “死人的开关”:如果服务器在设置 PENDING 后崩溃,键会自动过期,客户端可以重新尝试。

带幂等守护的 HTTP 处理器

func HandleCreateOrder(w http.ResponseWriter, r *http.Request, db *sql.DB, rdb *redis.Client) {
    // 1. Get Idempotency Key
    idempotencyKey := r.Header.Get("Idempotency-Key")
    if idempotencyKey == "" {
        http.Error(w, "Missing Idempotency-Key", http.StatusBadRequest)
        return
    }

    // 2. Execute Lua guard
    ctx := r.Context()
    val, err := rdb.Eval(ctx, luaScript, []string{idempotencyKey}, "PENDING", 60).Result()
    if err != nil && err != redis.Nil {
        http.Error(w, "Internal Server Error", http.StatusInternalServerError)
        return
    }

    // CASE A: Another request is processing
    if val == "PENDING" {
        http.Error(w, "Request is processing, please retry shortly", http.StatusConflict)
        return
    }

    // CASE B: Request already completed
    if val != nil {
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(fmt.Sprintf("%v", val)))
        return
    }

    // CASE C: New request – lock acquired
    orderID := uuid.New()
    order := Order{ID: orderID, UserID: uuid.New(), Amount: 1000}

    // Call the atomic transaction
    if err := CreateOrder(ctx, db, order); err != nil {
        // Remove lock on failure
        rdb.Del(ctx, idempotencyKey)
        http.Error(w, "Transaction failed", http.StatusInternalServerError)
        return
    }

    // 3. Store final result in Redis (cached for 24h)
    response := fmt.Sprintf(`{"status":"success","order_id":"%s"}`, orderID)
    rdb.Set(ctx, idempotencyKey, response, 24*time.Hour)

    w.Header().Set("Content-Type", "application/json")
    w.Write([]byte(response))
}

现在我们拥有:

  • 数据库 通过 SQL 事务受保护(原子性)。
  • API 通过 Redis Lua 脚本受保护(幂等性)。

背景工作者

CreateOrder 函数在 outbox 表中留下事件。后台工作者(“Courier”)轮询该表并将消息发布到 Redis。

扩展工作者

当多个服务副本运行时,它们可能都会获取相同的待处理事件。为避免重复处理,我们使用 PostgreSQL 的 FOR UPDATE SKIP LOCKED 子句,它会为某个工作者锁定行,并跳过已经被其他工作者锁定的行。

工作者实现

func StartOutboxWorker(ctx context.Context, db *sql.DB, rdb *redis.Client) {
    ticker := time.NewTicker(500 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            processBatch(ctx, db, rdb)
        }
    }
}

func processBatch(ctx context.Context, db *sql.DB, rdb *redis.Client) {
    // 1. Start a transaction
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        log.Printf("Worker failed to begin tx: %v", err)
        return
    }
    defer tx.Rollback()

    // 2. Fetch pending events with SKIP LOCKED
    rows, err := tx.QueryContext(ctx, `
        SELECT id, event_type, payload
        FROM outbox
        WHERE status = 'PENDING'
        ORDER BY created_at ASC
        LIMIT 10
        FOR UPDATE SKIP LOCKED
    `)
    if err != nil {
        log.Printf("Worker query failed: %v", err)
        return
    }
    defer rows.Close()

    for rows.Next() {
        var id uuid.UUID
        var eventType string
        var payload []byte

        if err := rows.Scan(&id, &eventType, &payload); err != nil {
            continue
        }

        // 3. Publish to Redis
        if err := rdb.Publish(ctx, eventType, payload).Err(); err != nil {
            log.Printf("Failed to publish event %s: %v", id, err)
            return // transaction will roll back, event stays PENDING
        }

        // 4. Mark as processed
        _, err = tx.ExecContext(ctx,
            "UPDATE outbox SET status = 'PROCESSED' WHERE id = $1", id)
        if err != nil {
            return
        }
    }

    // 5. Commit the batch
    if err := tx.Commit(); err != nil {
        log.Printf("Worker failed to commit: %v", err)
    }
}

健壮性保证

  • 至少一次投递 – 如果发布失败,事务回滚,事件保持 PENDING,稍后会重试。
  • 并发安全SKIP LOCKED 让多个工作者并行运行而不会处理同一事件两次。

结论

通过将 事务性 Outbox 模式幂等性(基于 Redis Lua 脚本)相结合,我们在分布式环境中重新获得了单体函数的简洁性。

  • 数据库写入和事件发布是原子的。
  • 重复的客户端请求被安全去重。
  • 背景工作者可靠地投递 outbox 事件,并能平滑扩展。

可靠性不是寄望于永不出错,而是构建能够优雅处理故障的系统。

Back to Blog

相关文章

阅读更多 »

第1276天:职业攀升

星期六 在前往车站之前,我在当前的副项目上写了一些代码。取得了相当不错的进展,然后该出发了。Made i...

无状态 AI 应用背后的架构

项目一开始就做了一个看似冒险的决定:不使用后端数据库。当时并不需要持久化用户数据——获取用户的响应就是……

JWT Token 验证器挑战

概述 2019 年,Django 的会话管理框架中包含一个细微但灾难性的漏洞 CVE‑2019‑11358。该框架未能正确 inv...