UPDATE...RETURNING 在 MongoDB 中:ACID 与幂等性以及 findOneAndUpdate()

发布: (2026年1月5日 GMT+8 02:52)
9 min read
原文: Dev.to

Source: Dev.to

请提供您希望翻译的具体文本内容(文章正文),我将把它翻译成简体中文并保持原有的 Markdown 格式、代码块和技术术语不变。谢谢!

单次调用的原子读写操作

您可以在单次调用中实现原子读写操作,而无需显式事务,从而减少往返次数并缩小并发冲突的窗口。

  • PostgreSQL – 使用 UPDATE … RETURNING 替代先 SELECT FOR UPDATEUPDATE
  • MongoDB – 使用 findOneAndUpdate() 替代先 updateOne()find()

这使得在 MongoDB 中能够进行单个 ACID 读写操作,具备容错性并且可以安全重试,因为它是幂等的。

演示:两次取款和收据

场景 – Bob 使用借记卡取款(不允许出现负余额)。应用程序首先执行 updateOne,仅在余额足够时才扣减金额,然后再单独执行 find() 来打印包含余额的收据。在这两次调用之间,Alice 使用信用卡(允许出现负余额)从同一账户取款,导致 Bob 打印的余额与实际检查的余额不一致。

解决方案 – 使用 findOneAndUpdate() 原子地返回更新后的余额,以实现取款和余额读取的同步。

连接和集合

from pymongo import MongoClient, ReturnDocument
import threading
import time

# Connect to MongoDB
client = MongoClient("mongodb://127.0.0.1:27017/?directConnection=true")
db = client.bank
accounts = db.accounts

# Prepare test account: Bob & Alice share this account
def reset_account():
    accounts.drop()
    accounts.insert_one({
        "_id": "b0b4l3ce",
        "owner": ["Bob", "Alice"],
        "balance": 100
    })

reset_account()

文档视图

bank> db.accounts.find()
[ { _id: 'b0b4l3ce', owner: [ 'Bob', 'Alice' ], balance: 100 } ]

余额被设为 100

使用 updateOne() 后接 find() 的场景

Bob 的取款(借记卡)

def bob_withdraw_debit(amount):
    print("[Bob] Attempting debit withdrawal", amount)

    # Application logic checks balance then updates
    result = accounts.update_one(
        {"_id": "b0b4l3ce", "balance": {"$gte": amount}},   # must have enough money
        {"$inc": {"balance": -amount}}
    )

    # If no document was updated, the filter didn't find enough funds
    if result.modified_count == 0:
        print("[Bob] Withdrawal denied - insufficient funds")
        return

    # Simulate processing delay before printing the ticket
    time.sleep(1)

    # Query the balance for the receipt
    balance = accounts.find_one({"_id": "b0b4l3ce"})["balance"]
    print(f"[Bob] Debit card ticket: withdrew {amount}, balance after withdrawal: {balance}")

Alice 的取款(信用卡)

def alice_withdraw_credit(amount, delay=0):
    time.sleep(delay)          # let Bob start first
    print("[Alice] Attempting credit withdrawal", amount)

    # No balance check for credit cards
    accounts.update_one(
        {"_id": "b0b4l3ce"},
        {"$inc": {"balance": -amount}}
    )
    print("[Alice] Credit withdrawal complete")

演示脚本(竞争条件)

def demo():
    reset_account()
    t_bob = threading.Thread(target=bob_withdraw_debit, args=(80,))
    t_alice = threading.Thread(target=alice_withdraw_credit, args=(30, 0.5))  # starts just after Bob update
    t_bob.start()
    t_alice.start()
    t_bob.join()
    t_alice.join()

输出

>>> demo()
[Bob] Attempting debit withdrawal 80
[Alice] Attempting credit withdrawal 30
[Alice] Credit withdrawal complete
[Bob] Debit card ticket: withdrew 80, balance after withdrawal: -10

Bob 收到的票据显示借记卡余额为负数——这是一个错误 ❌。

使用 findOneAndUpdate()returnDocument = AFTER)的场景

Bob 的取款(借记卡)

def bob_withdraw_debit(amount):
    print("[Bob] Attempting debit withdrawal", amount)

    doc = accounts.find_one_and_update(
        {"_id": "b0b4l3ce", "balance": {"$gte": amount}},
        {"$inc": {"balance": -amount}},
        return_document=ReturnDocument.AFTER   # get post‑update document atomically
    )

    # No need to check the update count; we have the document if it was updated
    if not doc:
        print("[Bob] Withdrawal denied - insufficient funds")
        return

    # Ticket immediately shows consistent balance
    print(f"[Bob] Ticket: withdrew {amount}, balance after withdrawal: {doc['balance']}")

再次运行演示

>>> demo()
[Bob] Attempting debit withdrawal 80
[Bob] Ticket: withdrew 80, balance after withdrawal: 20
awal: 20
[Alice] Attempting credit withdrawal 30
[Alice] Credit withdrawal complete

Bob 现在收到一张票据,显示提款的精确时间点的余额 ✅。
更新写入和更新后读取在文档上作为单个原子操作执行,因而在更新与显示的读取结果之间没有其他写入的机会。

弹性

在 MongoDB 中,读取和写入 不会 像关系型数据库那样获取事务锁,但文档更新仍然在文档层面保持原子性,即使没有显式启动事务。MongoDB 在内部使用轻量级的文档级锁来确保单个文档的 ACID 保证,因为一次更新可能涉及多个内部读写操作(例如,强制唯一约束和更新索引)。

  • updateOne() 只返回元数据(例如更新的文档数量)。
  • findOneAndUpdate() 返回更新后的文档本身,读取和写入在单文档层面的同一次原子操作中完成。即使在出现故障的情况下,这种原子性也会被保留。

如果网络中断或主节点崩溃并且次节点被提升为主节点,MongoDB 驱动会在 可重试写入(retryable writes)机制下重新尝试该操作。由于重试必须是幂等的,findOneAndUpdate() 在重试时会返回相同的文档镜像。

为此,MongoDB 会在内部系统集合(config.image_collection)中存储文档镜像——可以是 after 镜像(如本例所示,使用 returnDocument: "after")或 before 镜像。该集合的复制独立于 oplog,作为同一事务的一部分进行:

// Switch to the config database
use config

// View the image collection
db.image_collection.find()
[
  {
    _id: {
      id: UUID('d04e10d6-c61d-42ad-9a44-5bb226a898d8'),
      uid: Binary.createFromBase64('47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=', 0)
    },
    txnNum: Long('15'),
    ts: Timestamp({ t: 1767478784, i: 5 }),
    imageKind: 'postImage',
    image: { _id: 'b0b4l3ce', owner: [ 'Bob', 'Alice' ], balance: 20 },
    invalidated: false
  }
]

当你启用写入重试时,内部会使用该镜像集合来使写操作对故障具有弹性。此处理对应用程序透明,并提供最强的一致性保证。

与 PostgreSQL 的比较

在 PostgreSQL 中,等价的操作如下:

CREATE TABLE accounts (
    id TEXT PRIMARY KEY,
    balance NUMERIC,
    owners TEXT[]
);

INSERT INTO accounts VALUES ('b0b4l3ce', ARRAY['Bob','Alice'], 100);

-- Alice 的事务
UPDATE accounts
SET balance = balance - 30
WHERE id = 'shared_account';

-- Bob 的事务
UPDATE accounts
SET balance = balance - 80
WHERE id = 'b0b4l3ce' AND balance >= 80
RETURNING balance AS new_balance;

PostgreSQL 驱动 不会 自动重试失败,它们依赖 MVCC 和锁来确保 ACID 属性。

  • 可重复读(Repeatable Read) 隔离级别(适用于因为写入 SET balance = balance - 80 依赖于读取 WHERE balance >= 80):

    • Bob 的事务在开始时获取快照。当与 Alice 并发执行时,它仍然看到 balance = 100

    • 如果 Alice 先提交并将余额降至 70,Bob 的事务会因以下错误而失败:

      ERROR: could not serialize access due to concurrent update
    • 应用程序必须自行处理,通过重试整个事务;驱动不会自动完成此操作。

  • 已提交读(Read Committed)(默认)隔离级别:

    • 当 Alice 的更新锁住行时,Bob 的事务会等待。
    • Alice 提交后,PostgreSQL 会重新评估 Bob 的 WHERE 子句。此时余额已是 70,不再满足 balance >= 80,于是 UPDATE 影响 0 行,取款被拒绝,避免出现负余额。
    • 这种方式在只影响单行时工作良好,但在多行语句中可能出现不一致的情况,因为它可能在不同事务状态的行上操作。

结论

MongoDB 支持多文档事务和单文档原子操作,但它 强烈建议 在可能的情况下使用单文档操作。如果您将模式设计为业务逻辑可以放在单个文档中,findOneAndUpdate() 可以:

  • 执行条件检查,
  • 应用更新,且
  • 原子地返回更新后的文档,

从而避免竞争条件异常并实现幂等重试。

在某些情况下——例如在之前的博客文章 FOR UPDATE SKIP LOCKED in MongoDB 中描述的情况——先使用 updateOne() 再使用带有适当条件的 find() 就足够,并且可以避免存储前后镜像。

Back to Blog

相关文章

阅读更多 »