写得好:使用 Redis、Lua 和 Go 实现原子性和幂等性
Source: Dev.to
引言
如果世界里只有单体函数——一次执行的简单函数,崩溃后我们可以再试一次,生活会很轻松。
但我们构建的是分布式系统,事情没有那么简单(虽然很有趣)。
一个经典场景:用户点击 “立即支付”。后端必须:
- 从用户钱包中扣除余额(Postgres)。
- 发布事件,发送确认邮件并通知仓库(Redis/Kafka)。
如果数据库提交成功但网络在事件发布前失败,用户被扣费但邮件从未发送,仓库也不会发货。反向操作又会导致相反的问题。这就是 双写问题,微服务中数据完整性的隐形杀手。
为了解决它,我们需要两个架构支柱:
- 原子性 – 确保数据库写入和事件发布一起发生。
- 幂等性 – 保证重复点击只产生一次扣费。
我们将使用 Go、Redis 和 Postgres 构建一个健壮的后端,实现 事务性 Outbox 模式 来保证原子性,并使用 Redis 实现幂等性。
完整、可运行的源码已在 GitHub 上提供:
HERE
事务性 Outbox 模式
我们必须停止在一次请求中把数据库和消息中间件当作独立实体来对待。由于 Redis 不能参与 Postgres 事务,我们把消息队列 带到 数据库中。
事务性 Outbox 模式:不是直接发布到 Redis,而是在同一个修改业务数据的事务里向本地 SQL 表 (outbox) 插入消息。随后后台工作者会把这些消息投递到 Redis。
架构概览

数据库模式
-- Business Table: stores the actual state
CREATE TABLE orders (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
amount INT NOT NULL,
status VARCHAR(50) DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT NOW()
);
-- Outbox Table: stores the intent to publish
CREATE TABLE outbox (
id UUID PRIMARY KEY,
event_type VARCHAR(255) NOT NULL, -- e.g., "order.created"
payload JSONB NOT NULL, -- data to publish
status VARCHAR(50) DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT NOW()
);
Go 实现 – CreateOrder
type Order struct {
ID uuid.UUID `json:"id"`
UserID uuid.UUID `json:"user_id"`
Amount int `json:"amount"`
}
func CreateOrder(ctx context.Context, db *sql.DB, order Order) error {
// 1. Start the transaction (atomicity boundary)
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
// 2. Insert the business record
_, err = tx.ExecContext(ctx,
`INSERT INTO orders (id, user_id, amount) VALUES ($1, $2, $3)`,
order.ID, order.UserID, order.Amount)
if err != nil {
return fmt.Errorf("failed to insert order: %w", err)
}
// 3. Insert the outbox record
payload, err := json.Marshal(order)
if err != nil {
return fmt.Errorf("failed to marshal payload: %w", err)
}
_, err = tx.ExecContext(ctx,
`INSERT INTO outbox (id, event_type, payload) VALUES ($1, $2, $3)`,
uuid.New(), "order.created", payload)
if err != nil {
return fmt.Errorf("failed to insert outbox event: %w", err)
}
// 4. Commit the transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
这个函数只操作 Postgres。订单和 outbox 条目一起持久化——要么全部成功,要么全部失败(原子性)。
守护者:实现幂等性
在分布式系统中,网络故障会导致客户端重试请求,进而产生重复处理的风险。我们使用 幂等键(例如在 Idempotency-Key 头部发送的 UUID)并将其存储在 Redis 中,利用其快速原子操作。
幂等键的三种状态
| 状态 | 含义 |
|---|---|
| Null(新) | 之前未见过该键 – 加锁并继续。 |
| PENDING | 另一个请求正在处理该键 – 返回 409 Conflict。 |
| JSON Payload | 请求已完成 – 返回缓存的响应。 |
为什么使用 Lua?
朴素的检查‑后‑设置方式会产生竞争:
// BAD CODE: DO NOT USE
if redis.Get(key) == "" {
redis.Set(key, "PENDING")
// ... process ...
}
两个并发请求可能同时看到键为空并继续执行,导致重复扣费。Redis Lua 脚本以原子方式执行,消除这种竞争。
Lua 脚本 (script.lua)
-- script.lua
local key = KEYS[1]
local pending_status = ARGV[1] -- "PENDING"
local ttl = ARGV[2] -- expiration in seconds
-- 1. Check if key exists
local value = redis.call("GET", key)
-- 2. If it exists, return the value (could be "PENDING" or final JSON)
if value then
return value
end
-- 3. If not, lock it with "PENDING" and a TTL
redis.call("SET", key, pending_status, "EX", ttl)
return nil
TTL 充当 “死人的开关”:如果服务器在设置 PENDING 后崩溃,键会自动过期,客户端可以重新尝试。
带幂等守护的 HTTP 处理器
func HandleCreateOrder(w http.ResponseWriter, r *http.Request, db *sql.DB, rdb *redis.Client) {
// 1. Get Idempotency Key
idempotencyKey := r.Header.Get("Idempotency-Key")
if idempotencyKey == "" {
http.Error(w, "Missing Idempotency-Key", http.StatusBadRequest)
return
}
// 2. Execute Lua guard
ctx := r.Context()
val, err := rdb.Eval(ctx, luaScript, []string{idempotencyKey}, "PENDING", 60).Result()
if err != nil && err != redis.Nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// CASE A: Another request is processing
if val == "PENDING" {
http.Error(w, "Request is processing, please retry shortly", http.StatusConflict)
return
}
// CASE B: Request already completed
if val != nil {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(fmt.Sprintf("%v", val)))
return
}
// CASE C: New request – lock acquired
orderID := uuid.New()
order := Order{ID: orderID, UserID: uuid.New(), Amount: 1000}
// Call the atomic transaction
if err := CreateOrder(ctx, db, order); err != nil {
// Remove lock on failure
rdb.Del(ctx, idempotencyKey)
http.Error(w, "Transaction failed", http.StatusInternalServerError)
return
}
// 3. Store final result in Redis (cached for 24h)
response := fmt.Sprintf(`{"status":"success","order_id":"%s"}`, orderID)
rdb.Set(ctx, idempotencyKey, response, 24*time.Hour)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(response))
}
现在我们拥有:
- 数据库 通过 SQL 事务受保护(原子性)。
- API 通过 Redis Lua 脚本受保护(幂等性)。
背景工作者
CreateOrder 函数在 outbox 表中留下事件。后台工作者(“Courier”)轮询该表并将消息发布到 Redis。
扩展工作者
当多个服务副本运行时,它们可能都会获取相同的待处理事件。为避免重复处理,我们使用 PostgreSQL 的 FOR UPDATE SKIP LOCKED 子句,它会为某个工作者锁定行,并跳过已经被其他工作者锁定的行。
工作者实现
func StartOutboxWorker(ctx context.Context, db *sql.DB, rdb *redis.Client) {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
processBatch(ctx, db, rdb)
}
}
}
func processBatch(ctx context.Context, db *sql.DB, rdb *redis.Client) {
// 1. Start a transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
log.Printf("Worker failed to begin tx: %v", err)
return
}
defer tx.Rollback()
// 2. Fetch pending events with SKIP LOCKED
rows, err := tx.QueryContext(ctx, `
SELECT id, event_type, payload
FROM outbox
WHERE status = 'PENDING'
ORDER BY created_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED
`)
if err != nil {
log.Printf("Worker query failed: %v", err)
return
}
defer rows.Close()
for rows.Next() {
var id uuid.UUID
var eventType string
var payload []byte
if err := rows.Scan(&id, &eventType, &payload); err != nil {
continue
}
// 3. Publish to Redis
if err := rdb.Publish(ctx, eventType, payload).Err(); err != nil {
log.Printf("Failed to publish event %s: %v", id, err)
return // transaction will roll back, event stays PENDING
}
// 4. Mark as processed
_, err = tx.ExecContext(ctx,
"UPDATE outbox SET status = 'PROCESSED' WHERE id = $1", id)
if err != nil {
return
}
}
// 5. Commit the batch
if err := tx.Commit(); err != nil {
log.Printf("Worker failed to commit: %v", err)
}
}
健壮性保证
- 至少一次投递 – 如果发布失败,事务回滚,事件保持
PENDING,稍后会重试。 - 并发安全 –
SKIP LOCKED让多个工作者并行运行而不会处理同一事件两次。
结论
通过将 事务性 Outbox 模式 与 幂等性(基于 Redis Lua 脚本)相结合,我们在分布式环境中重新获得了单体函数的简洁性。
- 数据库写入和事件发布是原子的。
- 重复的客户端请求被安全去重。
- 背景工作者可靠地投递 outbox 事件,并能平滑扩展。
可靠性不是寄望于永不出错,而是构建能够优雅处理故障的系统。