UPDATE...RETURNING 在 MongoDB 中:ACID 与幂等性以及 findOneAndUpdate()
Source: Dev.to
请提供您希望翻译的具体文本内容(文章正文),我将把它翻译成简体中文并保持原有的 Markdown 格式、代码块和技术术语不变。谢谢!
单次调用的原子读写操作
您可以在单次调用中实现原子读写操作,而无需显式事务,从而减少往返次数并缩小并发冲突的窗口。
- PostgreSQL – 使用
UPDATE … RETURNING替代先SELECT FOR UPDATE再UPDATE。 - MongoDB – 使用
findOneAndUpdate()替代先updateOne()再find()。
这使得在 MongoDB 中能够进行单个 ACID 读写操作,具备容错性并且可以安全重试,因为它是幂等的。
演示:两次取款和收据
场景 – Bob 使用借记卡取款(不允许出现负余额)。应用程序首先执行 updateOne,仅在余额足够时才扣减金额,然后再单独执行 find() 来打印包含余额的收据。在这两次调用之间,Alice 使用信用卡(允许出现负余额)从同一账户取款,导致 Bob 打印的余额与实际检查的余额不一致。
解决方案 – 使用 findOneAndUpdate() 原子地返回更新后的余额,以实现取款和余额读取的同步。
连接和集合
from pymongo import MongoClient, ReturnDocument
import threading
import time
# Connect to MongoDB
client = MongoClient("mongodb://127.0.0.1:27017/?directConnection=true")
db = client.bank
accounts = db.accounts
# Prepare test account: Bob & Alice share this account
def reset_account():
accounts.drop()
accounts.insert_one({
"_id": "b0b4l3ce",
"owner": ["Bob", "Alice"],
"balance": 100
})
reset_account()
文档视图
bank> db.accounts.find()
[ { _id: 'b0b4l3ce', owner: [ 'Bob', 'Alice' ], balance: 100 } ]
余额被设为 100。
使用 updateOne() 后接 find() 的场景
Bob 的取款(借记卡)
def bob_withdraw_debit(amount):
print("[Bob] Attempting debit withdrawal", amount)
# Application logic checks balance then updates
result = accounts.update_one(
{"_id": "b0b4l3ce", "balance": {"$gte": amount}}, # must have enough money
{"$inc": {"balance": -amount}}
)
# If no document was updated, the filter didn't find enough funds
if result.modified_count == 0:
print("[Bob] Withdrawal denied - insufficient funds")
return
# Simulate processing delay before printing the ticket
time.sleep(1)
# Query the balance for the receipt
balance = accounts.find_one({"_id": "b0b4l3ce"})["balance"]
print(f"[Bob] Debit card ticket: withdrew {amount}, balance after withdrawal: {balance}")
Alice 的取款(信用卡)
def alice_withdraw_credit(amount, delay=0):
time.sleep(delay) # let Bob start first
print("[Alice] Attempting credit withdrawal", amount)
# No balance check for credit cards
accounts.update_one(
{"_id": "b0b4l3ce"},
{"$inc": {"balance": -amount}}
)
print("[Alice] Credit withdrawal complete")
演示脚本(竞争条件)
def demo():
reset_account()
t_bob = threading.Thread(target=bob_withdraw_debit, args=(80,))
t_alice = threading.Thread(target=alice_withdraw_credit, args=(30, 0.5)) # starts just after Bob update
t_bob.start()
t_alice.start()
t_bob.join()
t_alice.join()
输出
>>> demo()
[Bob] Attempting debit withdrawal 80
[Alice] Attempting credit withdrawal 30
[Alice] Credit withdrawal complete
[Bob] Debit card ticket: withdrew 80, balance after withdrawal: -10
Bob 收到的票据显示借记卡余额为负数——这是一个错误 ❌。
使用 findOneAndUpdate()(returnDocument = AFTER)的场景
Bob 的取款(借记卡)
def bob_withdraw_debit(amount):
print("[Bob] Attempting debit withdrawal", amount)
doc = accounts.find_one_and_update(
{"_id": "b0b4l3ce", "balance": {"$gte": amount}},
{"$inc": {"balance": -amount}},
return_document=ReturnDocument.AFTER # get post‑update document atomically
)
# No need to check the update count; we have the document if it was updated
if not doc:
print("[Bob] Withdrawal denied - insufficient funds")
return
# Ticket immediately shows consistent balance
print(f"[Bob] Ticket: withdrew {amount}, balance after withdrawal: {doc['balance']}")
再次运行演示
>>> demo()
[Bob] Attempting debit withdrawal 80
[Bob] Ticket: withdrew 80, balance after withdrawal: 20
awal: 20
[Alice] Attempting credit withdrawal 30
[Alice] Credit withdrawal complete
Bob 现在收到一张票据,显示提款的精确时间点的余额 ✅。
更新写入和更新后读取在文档上作为单个原子操作执行,因而在更新与显示的读取结果之间没有其他写入的机会。
弹性
在 MongoDB 中,读取和写入 不会 像关系型数据库那样获取事务锁,但文档更新仍然在文档层面保持原子性,即使没有显式启动事务。MongoDB 在内部使用轻量级的文档级锁来确保单个文档的 ACID 保证,因为一次更新可能涉及多个内部读写操作(例如,强制唯一约束和更新索引)。
updateOne()只返回元数据(例如更新的文档数量)。findOneAndUpdate()返回更新后的文档本身,读取和写入在单文档层面的同一次原子操作中完成。即使在出现故障的情况下,这种原子性也会被保留。
如果网络中断或主节点崩溃并且次节点被提升为主节点,MongoDB 驱动会在 可重试写入(retryable writes)机制下重新尝试该操作。由于重试必须是幂等的,findOneAndUpdate() 在重试时会返回相同的文档镜像。
为此,MongoDB 会在内部系统集合(config.image_collection)中存储文档镜像——可以是 after 镜像(如本例所示,使用 returnDocument: "after")或 before 镜像。该集合的复制独立于 oplog,作为同一事务的一部分进行:
// Switch to the config database
use config
// View the image collection
db.image_collection.find()
[
{
_id: {
id: UUID('d04e10d6-c61d-42ad-9a44-5bb226a898d8'),
uid: Binary.createFromBase64('47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=', 0)
},
txnNum: Long('15'),
ts: Timestamp({ t: 1767478784, i: 5 }),
imageKind: 'postImage',
image: { _id: 'b0b4l3ce', owner: [ 'Bob', 'Alice' ], balance: 20 },
invalidated: false
}
]
当你启用写入重试时,内部会使用该镜像集合来使写操作对故障具有弹性。此处理对应用程序透明,并提供最强的一致性保证。
与 PostgreSQL 的比较
在 PostgreSQL 中,等价的操作如下:
CREATE TABLE accounts (
id TEXT PRIMARY KEY,
balance NUMERIC,
owners TEXT[]
);
INSERT INTO accounts VALUES ('b0b4l3ce', ARRAY['Bob','Alice'], 100);
-- Alice 的事务
UPDATE accounts
SET balance = balance - 30
WHERE id = 'shared_account';
-- Bob 的事务
UPDATE accounts
SET balance = balance - 80
WHERE id = 'b0b4l3ce' AND balance >= 80
RETURNING balance AS new_balance;
PostgreSQL 驱动 不会 自动重试失败,它们依赖 MVCC 和锁来确保 ACID 属性。
-
可重复读(Repeatable Read) 隔离级别(适用于因为写入
SET balance = balance - 80依赖于读取WHERE balance >= 80):-
Bob 的事务在开始时获取快照。当与 Alice 并发执行时,它仍然看到
balance = 100。 -
如果 Alice 先提交并将余额降至 70,Bob 的事务会因以下错误而失败:
ERROR: could not serialize access due to concurrent update -
应用程序必须自行处理,通过重试整个事务;驱动不会自动完成此操作。
-
-
已提交读(Read Committed)(默认)隔离级别:
- 当 Alice 的更新锁住行时,Bob 的事务会等待。
- Alice 提交后,PostgreSQL 会重新评估 Bob 的
WHERE子句。此时余额已是 70,不再满足balance >= 80,于是UPDATE影响 0 行,取款被拒绝,避免出现负余额。 - 这种方式在只影响单行时工作良好,但在多行语句中可能出现不一致的情况,因为它可能在不同事务状态的行上操作。
结论
MongoDB 支持多文档事务和单文档原子操作,但它 强烈建议 在可能的情况下使用单文档操作。如果您将模式设计为业务逻辑可以放在单个文档中,findOneAndUpdate() 可以:
- 执行条件检查,
- 应用更新,且
- 原子地返回更新后的文档,
从而避免竞争条件异常并实现幂等重试。
在某些情况下——例如在之前的博客文章 FOR UPDATE SKIP LOCKED in MongoDB 中描述的情况——先使用 updateOne() 再使用带有适当条件的 find() 就足够,并且可以避免存储前后镜像。