MongoDB 等价于 FOR UPDATE SKIP LOCKED

发布: (2026年1月2日 GMT+8 07:54)
10 min read
原文: Dev.to

抱歉,我目前只看到了源链接本身。请您把需要翻译的完整文本(除代码块和 URL 之外)粘贴在这里,我就可以帮您翻译成简体中文并保持原有的格式。

Python 示例

#!/usr/bin/env python3
"""
MongoDB “SELECT … FOR UPDATE SKIP LOCKED” demo.

Each worker:
  1. Finds a document that is not locked.
  2. Atomically claims it by setting a lock field.
  3. Calls an external API.
  4. Writes the result and releases the lock.
"""

import os
import random
import string
import time
from datetime import datetime, timedelta

import requests
from pymongo import MongoClient, ReturnDocument

# ----------------------------------------------------------------------
# Configuration
# ----------------------------------------------------------------------
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
DB_NAME = "demo"
COLL_NAME = "ips"

LOCK_TTL_SECONDS = 300          # how long a lock is considered valid
API_URL = "http://ip-api.com/json/"

# ----------------------------------------------------------------------
# Helper functions
# ----------------------------------------------------------------------
def random_ip() -> str:
    """Generate a random IPv4 address."""
    return ".".join(str(random.randint(0, 255)) for _ in range(4))


def init_data(coll, n=100):
    """Insert *n* documents with random IPs if the collection is empty."""
    if coll.estimated_document_count() == 0:
        docs = [{"ip": random_ip(), "processed": False} for _ in range(n)]
        coll.insert_many(docs)
        print(f"Inserted {n} test documents.")
    else:
        print("Collection already contains data; skipping init.")


def claim_document(coll, worker_id: str):
    """
    Atomically claim the first unlocked document.

    The query looks for:
      - documents that are not processed,
      - and either have no lock or have an expired lock.
    """
    now = datetime.utcnow()
    lock_until = now + timedelta(seconds=LOCK_TTL_SECONDS)

    query = {
        "processed": False,
        "$or": [
            {"lock": {"$exists": False}},
            {"lock.until": {"$lt": now}},
        ],
    }

    update = {
        "$set": {
            "lock": {
                "by": worker_id,
                "until": lock_until,
            }
        }
    }

    # findOneAndUpdate returns the *pre‑update* document, i.e. the one we just claimed
    doc = coll.find_one_and_update(
        filter=query,
        update=update,
        return_document=ReturnDocument.BEFORE,
    )
    return doc


def fetch_location(ip: str) -> dict:
    """Call the external IP‑API service and return the JSON payload."""
    try:
        resp = requests.get(f"{API_URL}{ip}", timeout=5)
        resp.raise_for_status()
        return resp.json()
    except Exception as exc:
        print(f"API error for {ip}: {exc}")
        return {}


def finalize_document(coll, doc_id, location: dict):
    """Write the location data and remove the lock."""
    coll.update_one(
        {"_id": doc_id},
        {
            "$set": {
                "location": location,
                "processed": True,
            },
            "$unset": {"lock": ""},
        },
    )


# ----------------------------------------------------------------------
# Main worker loop
# ----------------------------------------------------------------------
def worker():
    client = MongoClient(MONGO_URI)
    coll = client[DB_NAME][COLL_NAME]

    worker_id = f"{os.getpid()}-{''.join(random.choices(string.ascii_lowercase, k=4))}"
    print(f"Worker {worker_id} started.")

    while True:
        doc = claim_document(coll, worker_id)
        if not doc:
            print("No more work – exiting.")
            break

        ip = doc["ip"]
        print(f"[{worker_id}] Processing IP {ip} (doc _id={doc['_id']})")
        location = fetch_location(ip)

        finalize_document(coll, doc["_id"], location)
        print(f"[{worker_id}] Finished IP {ip}")

        # optional back‑off to avoid hammering the API
        time.sleep(random.uniform(0.1, 0.5))


if __name__ == "__main__":
    # Initialise data on first run
    client = MongoClient(MONGO_URI)
 collection = client[DB_NAME][COLL_NAME]
 init_data(collection, n=200)

 # Start the worker (run multiple instances in separate terminals or containers)
 worker()

工作原理

步骤MongoDB 的操作为何它模仿 FOR UPDATE SKIP LOCKED
1. Claimfind_one_and_update 原子地添加一个 lock 子文档(byuntil)。该文档现在被工作者“拥有”;其他工作者的查询会过滤掉已经拥有未过期锁的文档。
2. Process工作者在任何事务之外调用外部 API。没有长时间运行的操作在 MongoDB 内持有锁,从而避免死锁并保持乐观并发模型的完整性。
3. Finishupdate_one 写入结果并 $unset 锁。文档再次对其他工作者可见(或因 processed: true 而保持隐藏)。

Running multiple workers

打开多个终端(或启动容器),并同时执行脚本:

python3 mongo_skip_locked_demo.py

每个实例将会:

  • 选取一个未锁定的文档,
  • 将其锁定 LOCK_TTL_SECONDS 秒,
  • 处理它,
  • 释放锁。

如果工作进程在释放锁之前崩溃,锁将在 TTL 过后失效,其他工作进程即可获取同一文档——这正是你在 SELECT … FOR UPDATE SKIP LOCKED 中期望的行为。

示例文档

初始文档

{
  "_id": { "$oid": "6956e772baea71e37a818e73" },
  "originatingIp": "1.1.1.1",
  "location": null
}

文档在处理期间

{
  "_id": { "$oid": "6956e772baea71e37a818e73" },
  "originatingIp": "1.1.1.1",
  "location": null,
  "lock": {
    "by": "franck",
    "until": "2026-01-01T22:33:10.833Z"
  }
}

文档处理后

{
  "_id": { "$oid": "6956e772baea71e37a818e73" },
  "originatingIp": "1.1.1.1",
  "location": {
    "status": "success",
    "country": "Hong Kong",
    "countryCode": "HK",
    "region": "HCW",
    "regionName": "Central and Western District",
    "city": "Hong Kong",
    "zip": "",
    "lat": 22.3193,
    "lon": 114.1693,
    "timezone": "Asia/Hong_Kong",
    "isp": "Cloudflare, Inc",
    "org": "APNIC and Cloudflare DNS Resolver project",
    "as": "AS13335 Cloudflare, Inc.",
    "query": "1.1.1.1"
  }
}

完整的 Python 程序(替代实现)

import os
import random
import socket
import threading
import time
from datetime import datetime, timedelta

import requests
from pymongo import MongoClient

# ----------------------------------------------------------------------
# MongoDB connection and collection
# ----------------------------------------------------------------------
client = MongoClient("mongodb://127.0.0.1:27017/?directConnection=true")
db = client.test
messages = db.message

# ----------------------------------------------------------------------
# Test settings (adjust as needed)
# ----------------------------------------------------------------------
DOCUMENTS = 10          # Number of documents created initially
THREADS   = 5           # Number of worker threads
SECONDS   = 15          # How long each thread loops trying to claim docs

# Worker identity (used for the lock)
WORKER_ID = f"{socket.gethostname()}-{os.getpid()}"
LOCK_DURATION = timedelta(seconds=60)   # Expected max processing time

# Helper to get UTC now
def utcnow() -> datetime:
    return datetime.utcnow()

# Grace period to tolerate clock skew when checking lock expiry
MAX_CLOCK_SKEW = timedelta(seconds=1)

# ----------------------------------------------------------------------
# Prepare test messages (random IPs)
# ----------------------------------------------------------------------
def insert_test_docs() -> None:
    """Drop the collection, create a partial index, and insert test docs."""
    messages.drop()
    # Partial index on the lock field – only unprocessed docs (location: null) are indexed
    messages.create_index(
        [("lock.until", 1)],
        partialFilterExpression={"location": None}
    )

    ips = [
        ".".join(str(random.randint(1, 255)) for _ in range(4))
        for _ in range(DOCUMENTS)
    ]

    docs = [{"originatingIp": ip, "location": None} for ip in ips]

    messages.insert_many(docs)
    print(f"[STARTUP] Inserted {DOCUMENTS} test docs into 'message'")
    for doc in messages.find({}, {"_id": 0, "originatingIp": 1, "location": 1}):
        print(doc)

# ----------------------------------------------------------------------
# Claim a document for processing
# ----------------------------------------------------------------------
def claim_document() -> dict | None:
    """
    Atomically claim a document that is:
      * unprocessed (location == null)
      * not locked, or lock has expired (with a 1‑second grace)
    Returns the claimed document, or None if none are available.
    """
    now = utcnow()
    lock_until = now + LOCK_DURATION

    filter_query = {
        "location": None,
        "$or": [
            {"lock": {"$exists": False}},
            {"lock.until": {"$lt": now - MAX_CLOCK_SKEW}}
        ]
    }

    update = {
        "$set": {
            "lock": {
                "by": WORKER_ID,
                "until": lock_until
            }
        }
    }

    from pymongo import ReturnDocument
    doc = messages.find_one_and_update(
        filter_query,
        update,
        return_document=ReturnDocument.AFTER
    )
    return doc

# ----------------------------------------------------------------------
# Helper: fetch location from ip‑api.com
# ----------------------------------------------------------------------
def fetch_location(ip: str) -> dict:
    """Query ip‑api.com for location data."""
    url = f"http://ip-api.com/json/{ip}"
    resp = requests.get(url, timeout=5)
    resp.raise_for_status()
    return resp.json()

# ----------------------------------------------------------------------
# Process a single document (claim → fetch → update)
# ----------------------------------------------------------------------
def process_document() -> bool:
    """
    Claims a document, fetches its location, and updates it.
    Returns True if a document was processed, False otherwise.
    """
    doc = claim_document()
    if not doc:
        return False
    ip = doc["originatingIp"]
    try:
        location = fetch_location(ip)
    except Exception as exc:
        print(f"[{WORKER_ID}] Failed to fetch location for {ip}: {exc}")
        messages.update_one(
            {"_id": doc["_id"], "lock.by": WORKER_ID},
            {"$unset": {"lock": ""}}
        )
        return True   # processed (even though it failed)

    now = utcnow()
    result = messages.update_one(
        {
            "_id": doc["_id"],
            "lock.by": WORKER_ID,
            "lock.until": {"$gt": now}
        },
        {
            "$set": {"location": location},
            "$unset": {"lock": ""}
        }
    )
    if result.matched_count == 0:
        print(f"[{WORKER_ID}] Lock lost for {doc['_id']}")
    else:
        print(f"[{WORKER_ID}] Updated document {doc['_id']} with location")
    return True

# ----------------------------------------------------------------------
# Worker thread loop
# ----------------------------------------------------------------------
def worker(stop_event: threading.Event) -> None:
    """Continuously claim and process documents until the stop event is set."""
    while not stop_event.is_set():
        processed = process_document()
        if not processed:
            time.sleep(0.1)

# ----------------------------------------------------------------------
# Main entry point
# ----------------------------------------------------------------------
def main() -> None:
    insert_test_docs()

    stop_event = threading.Event()
    threads = [
        threading.Thread(target=worker, args=(stop_event,), name=f"worker-{i}")
        for i in range(THREADS)
    ]

    for t in threads:
        t.start()

    time.sleep(SECONDS)
    stop_event.set()

    for t in threads:
        t.join()

    print("\n=== FINAL DOCUMENTS ===")
    for doc in messages.find().sort("_id"):
        print(doc)

if __name__ == "__main__":
    main()

运行演示

  1. 启动本地 MongoDB 实例(默认 mongodb://127.0.0.1:27017)。

  2. 安装依赖:

    pip install pymongo requests
  3. 执行脚本:

    python your_script_name.py

程序将会:

  • 插入少量包含随机 IP 地址的测试文档。
  • 启动多个工作线程,这些线程会认领文档,从 ip‑api.com 获取其地理位置,并将结果存回 MongoDB。
  • 在配置的运行时间结束后,打印所有文档的最终状态,以便您验证每个文档恰好被处理一次。

要点

  • Lock‑field pattern 提供 SELECT … FOR UPDATE SKIP LOCKED 风格的语义,而无需多文档事务。
  • Partial indexes 即使在大规模集合上,也能保持 claim 查询的快速。
  • Expiration timestamps 使系统能够抵御工作进程崩溃或网络波动。
  • 避免长时间运行的事务,可防止数据库在等待外部服务时持有锁。

欢迎根据工作负载自由调整该模式(不同的锁字段、锁持续时间、重试逻辑)。

声明文档(独立代码片段)

# Get the current time and compute the lock expiry
now = utcnow()
lock_expiry = now + LOCK_DURATION
token = random.randint(1000, 9999)          # unique lock token for extra safety

# Atomic lock claim: match unlocked documents or locks that have expired
result = messages.update_one(
    {
        "$and": [
            {"location": None},
            {
                "$or": [
                    {"lock": {"$exists": False}},
                    {"lock.until": {"$lt": now - MAX_CLOCK_SKEW}},
                ]
            },
        ]
    },
    {"$set": {"lock": {"by": WORKER_ID, "until": lock_expiry, "token": token}}}
)

if result.modified_count == 0:
    return None

doc = messages.find_one(
    {"lock.by": WORKER_ID, "lock.until": lock_expiry, "lock.token": token}
)

if doc:
    print(f"[{WORKER_ID}] claimed IP {doc['originatingIp']} with token={token}")
else:
    print(f"[{WORKER_ID}] claim succeeded but fetch failed — possible race?")
return doc

调用公共 API(独立代码片段)

import requests
from typing import Optional, Dict, Any

def fetch_location(ip: str) -> Optional[Dict[str, Any]]:
    """
    Retrieve geolocation information for an IP address using ip‑api.com.
    """
    url = f"http://ip-api.com/json/{ip}"
    try:
        response = requests.get(url, timeout=30)
        if response.status_code == 200:
            return response.json()
        print(f"[API] Error: HTTP {response.status_code} for {ip}")
        return None
    except Exception as exc:  # pragma: no cover
        print(f"[API] Exception for {ip}: {exc}")
        return None

在循环中处理消息(独立代码片段)

Back to Blog

相关文章

阅读更多 »