MongoDB equivalent to FOR UPDATE SKIP LOCKED

Published: (January 1, 2026 at 06:54 PM EST)
8 min read
Source: Dev.to

Source: Dev.to

Python example

#!/usr/bin/env python3
"""
MongoDB “SELECT … FOR UPDATE SKIP LOCKED” demo.

Each worker:
  1. Finds a document that is not locked.
  2. Atomically claims it by setting a lock field.
  3. Calls an external API.
  4. Writes the result and releases the lock.
"""

import os
import random
import string
import time
from datetime import datetime, timedelta

import requests
from pymongo import MongoClient, ReturnDocument

# ----------------------------------------------------------------------
# Configuration
# ----------------------------------------------------------------------
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
DB_NAME = "demo"
COLL_NAME = "ips"

LOCK_TTL_SECONDS = 300          # how long a lock is considered valid
API_URL = "http://ip-api.com/json/"

# ----------------------------------------------------------------------
# Helper functions
# ----------------------------------------------------------------------
def random_ip() -> str:
    """Generate a random IPv4 address."""
    return ".".join(str(random.randint(0, 255)) for _ in range(4))


def init_data(coll, n=100):
    """Insert *n* documents with random IPs if the collection is empty."""
    if coll.estimated_document_count() == 0:
        docs = [{"ip": random_ip(), "processed": False} for _ in range(n)]
        coll.insert_many(docs)
        print(f"Inserted {n} test documents.")
    else:
        print("Collection already contains data; skipping init.")


def claim_document(coll, worker_id: str):
    """
    Atomically claim the first unlocked document.

    The query looks for:
      - documents that are not processed,
      - and either have no lock or have an expired lock.
    """
    now = datetime.utcnow()
    lock_until = now + timedelta(seconds=LOCK_TTL_SECONDS)

    query = {
        "processed": False,
        "$or": [
            {"lock": {"$exists": False}},
            {"lock.until": {"$lt": now}},
        ],
    }

    update = {
        "$set": {
            "lock": {
                "by": worker_id,
                "until": lock_until,
            }
        }
    }

    # findOneAndUpdate returns the *pre‑update* document, i.e. the one we just claimed
    doc = coll.find_one_and_update(
        filter=query,
        update=update,
        return_document=ReturnDocument.BEFORE,
    )
    return doc


def fetch_location(ip: str) -> dict:
    """Call the external IP‑API service and return the JSON payload."""
    try:
        resp = requests.get(f"{API_URL}{ip}", timeout=5)
        resp.raise_for_status()
        return resp.json()
    except Exception as exc:
        print(f"API error for {ip}: {exc}")
        return {}


def finalize_document(coll, doc_id, location: dict):
    """Write the location data and remove the lock."""
    coll.update_one(
        {"_id": doc_id},
        {
            "$set": {
                "location": location,
                "processed": True,
            },
            "$unset": {"lock": ""},
        },
    )


# ----------------------------------------------------------------------
# Main worker loop
# ----------------------------------------------------------------------
def worker():
    client = MongoClient(MONGO_URI)
    coll = client[DB_NAME][COLL_NAME]

    worker_id = f"{os.getpid()}-{''.join(random.choices(string.ascii_lowercase, k=4))}"
    print(f"Worker {worker_id} started.")

    while True:
        doc = claim_document(coll, worker_id)
        if not doc:
            print("No more work – exiting.")
            break

        ip = doc["ip"]
        print(f"[{worker_id}] Processing IP {ip} (doc _id={doc['_id']})")
        location = fetch_location(ip)

        finalize_document(coll, doc["_id"], location)
        print(f"[{worker_id}] Finished IP {ip}")

        # optional back‑off to avoid hammering the API
        time.sleep(random.uniform(0.1, 0.5))


if __name__ == "__main__":
    # Initialise data on first run
    client = MongoClient(MONGO_URI)
    collection = client[DB_NAME][COLL_NAME]
    init_data(collection, n=200)

    # Start the worker (run multiple instances in separate terminals or containers)
    worker()

How it works

StepWhat MongoDB doesWhy it mimics FOR UPDATE SKIP LOCKED
1. Claimfind_one_and_update atomically adds a lock sub‑document (by, until).The document is now “owned” by a worker; other workers’ queries filter out any document that already has a non‑expired lock.
2. ProcessThe worker calls the external API outside of any transaction.No long‑running operation holds a lock inside MongoDB, avoiding dead‑locks and keeping the optimistic concurrency model intact.
3. Finishupdate_one writes the result and $unsets the lock.The document becomes visible again for other workers (or stays hidden because processed: true).

Running multiple workers

Open several terminals (or spin up containers) and execute the script simultaneously:

python3 mongo_skip_locked_demo.py

Each instance will:

  • pick an unlocked document,
  • lock it for LOCK_TTL_SECONDS,
  • process it,
  • release the lock.

If a worker crashes before releasing the lock, the lock expires after the TTL and another worker can claim the same document—exactly the behaviour you expect from SELECT … FOR UPDATE SKIP LOCKED.

Example Documents

Initial document

{
  "_id": { "$oid": "6956e772baea71e37a818e73" },
  "originatingIp": "1.1.1.1",
  "location": null
}

Document while it is being processed

{
  "_id": { "$oid": "6956e772baea71e37a818e73" },
  "originatingIp": "1.1.1.1",
  "location": null,
  "lock": {
    "by": "franck",
    "until": "2026-01-01T22:33:10.833Z"
  }
}

Document after processing

{
  "_id": { "$oid": "6956e772baea71e37a818e73" },
  "originatingIp": "1.1.1.1",
  "location": {
    "status": "success",
    "country": "Hong Kong",
    "countryCode": "HK",
    "region": "HCW",
    "regionName": "Central and Western District",
    "city": "Hong Kong",
    "zip": "",
    "lat": 22.3193,
    "lon": 114.1693,
    "timezone": "Asia/Hong_Kong",
    "isp": "Cloudflare, Inc",
    "org": "APNIC and Cloudflare DNS Resolver project",
    "as": "AS13335 Cloudflare, Inc.",
    "query": "1.1.1.1"
  }
}

Complete Python program (alternative implementation)

import os
import random
import socket
import threading
import time
from datetime import datetime, timedelta

import requests
from pymongo import MongoClient

# ----------------------------------------------------------------------
# MongoDB connection and collection
# ----------------------------------------------------------------------
client = MongoClient("mongodb://127.0.0.1:27017/?directConnection=true")
db = client.test
messages = db.message

# ----------------------------------------------------------------------
# Test settings (adjust as needed)
# ----------------------------------------------------------------------
DOCUMENTS = 10          # Number of documents created initially
THREADS   = 5           # Number of worker threads
SECONDS   = 15          # How long each thread loops trying to claim docs

# Worker identity (used for the lock)
WORKER_ID = f"{socket.gethostname()}-{os.getpid()}"
LOCK_DURATION = timedelta(seconds=60)   # Expected max processing time

# Helper to get UTC now
def utcnow() -> datetime:
    return datetime.utcnow()

# Grace period to tolerate clock skew when checking lock expiry
MAX_CLOCK_SKEW = timedelta(seconds=1)

# ----------------------------------------------------------------------
# Prepare test messages (random IPs)
# ----------------------------------------------------------------------
def insert_test_docs() -> None:
    """Drop the collection, create a partial index, and insert test docs."""
    messages.drop()
    # Partial index on the lock field – only unprocessed docs (location: null) are indexed
    messages.create_index(
        [("lock.until", 1)],
        partialFilterExpression={"location": None}
    )

    ips = [
        ".".join(str(random.randint(1, 255)) for _ in range(4))
        for _ in range(DOCUMENTS)
    ]

    docs = [{"originatingIp": ip, "location": None} for ip in ips]

    messages.insert_many(docs)
    print(f"[STARTUP] Inserted {DOCUMENTS} test docs into 'message'")
    for doc in messages.find({}, {"_id": 0, "originatingIp": 1, "location": 1}):
        print(doc)

# ----------------------------------------------------------------------
# Claim a document for processing
# ----------------------------------------------------------------------
def claim_document() -> dict | None:
    """
    Atomically claim a document that is:
      * unprocessed (location == null)
      * not locked, or lock has expired (with a 1‑second grace)
    Returns the claimed document, or None if none are available.
    """
    now = utcnow()
    lock_until = now + LOCK_DURATION

    filter_query = {
        "location": None,
        "$or": [
            {"lock": {"$exists": False}},
            {"lock.until": {"$lt": now - MAX_CLOCK_SKEW}}
        ]
    }

    update = {
        "$set": {
            "lock": {
                "by": WORKER_ID,
                "until": lock_until
            }
        }
    }

    from pymongo import ReturnDocument
    doc = messages.find_one_and_update(
        filter_query,
        update,
        return_document=ReturnDocument.AFTER
    )
    return doc

# ----------------------------------------------------------------------
# Helper: fetch location from ip‑api.com
# ----------------------------------------------------------------------
def fetch_location(ip: str) -> dict:
    """Query ip‑api.com for location data."""
    url = f"http://ip-api.com/json/{ip}"
    resp = requests.get(url, timeout=5)
    resp.raise_for_status()
    return resp.json()

# ----------------------------------------------------------------------
# Process a single document (claim → fetch → update)
# ----------------------------------------------------------------------
def process_document() -> bool:
    """
    Claims a document, fetches its location, and updates it.
    Returns True if a document was processed, False otherwise.
    """
    doc = claim_document()
    if not doc:
        return False

    ip = doc["originatingIp"]
    try:
        location = fetch_location(ip)
    except Exception as exc:
        print(f"[{WORKER_ID}] Failed to fetch location for {ip}: {exc}")
        messages.update_one(
            {"_id": doc["_id"], "lock.by": WORKER_ID},
            {"$unset": {"lock": ""}}
        )
        return True   # processed (even though it failed)

    now = utcnow()
    result = messages.update_one(
        {
            "_id": doc["_id"],
            "lock.by": WORKER_ID,
            "lock.until": {"$gt": now}
        },
        {
            "$set": {"location": location},
            "$unset": {"lock": ""}
        }
    )
    if result.matched_count == 0:
        print(f"[{WORKER_ID}] Lock lost for {doc['_id']}")
    else:
        print(f"[{WORKER_ID}] Updated document {doc['_id']} with location")
    return True

# ----------------------------------------------------------------------
# Worker thread loop
# ----------------------------------------------------------------------
def worker(stop_event: threading.Event) -> None:
    """Continuously claim and process documents until the stop event is set."""
    while not stop_event.is_set():
        processed = process_document()
        if not processed:
            time.sleep(0.1)

# ----------------------------------------------------------------------
# Main entry point
# ----------------------------------------------------------------------
def main() -> None:
    insert_test_docs()

    stop_event = threading.Event()
    threads = [
        threading.Thread(target=worker, args=(stop_event,), name=f"worker-{i}")
        for i in range(THREADS)
    ]

    for t in threads:
        t.start()

    time.sleep(SECONDS)
    stop_event.set()

    for t in threads:
        t.join()

    print("\n=== FINAL DOCUMENTS ===")
    for doc in messages.find().sort("_id"):
        print(doc)

if __name__ == "__main__":
    main()

Running the demo

  1. Start a local MongoDB instance (default mongodb://127.0.0.1:27017).

  2. Install dependencies:

    pip install pymongo requests
  3. Execute the script:

    python your_script_name.py

The program will:

  • Insert a handful of test documents containing random IP addresses.
  • Spin up several worker threads that claim documents, fetch their geographic location from ip‑api.com, and store the result back in MongoDB.
  • After the configured run time, print the final state of all documents so you can verify that each one has been processed exactly once.

Takeaways

  • Lock‑field pattern provides SELECT … FOR UPDATE SKIP LOCKED‑style semantics without requiring multi‑document transactions.
  • Partial indexes keep the claim query fast even on massive collections.
  • Expiration timestamps make the system resilient to worker crashes or network hiccups.
  • Avoiding long‑running transactions prevents the database from holding locks while waiting on external services.

Feel free to adapt the pattern (different lock fields, lock durations, retry logic) to suit your workload.

Claim a document (stand‑alone snippet)

# Get the current time and compute the lock expiry
now = utcnow()
lock_expiry = now + LOCK_DURATION
token = random.randint(1000, 9999)          # unique lock token for extra safety

# Atomic lock claim: match unlocked documents or locks that have expired
result = messages.update_one(
    {
        "$and": [
            {"location": None},
            {
                "$or": [
                    {"lock": {"$exists": False}},
                    {"lock.until": {"$lt": now - MAX_CLOCK_SKEW}},
                ]
            },
        ]
    },
    {"$set": {"lock": {"by": WORKER_ID, "until": lock_expiry, "token": token}}}
)

if result.modified_count == 0:
    return None

doc = messages.find_one(
    {"lock.by": WORKER_ID, "lock.until": lock_expiry, "lock.token": token}
)

if doc:
    print(f"[{WORKER_ID}] claimed IP {doc['originatingIp']} with token={token}")
else:
    print(f"[{WORKER_ID}] claim succeeded but fetch failed — possible race?")
return doc

Call the public API (stand‑alone snippet)

import requests
from typing import Optional, Dict, Any

def fetch_location(ip: str) -> Optional[Dict[str, Any]]:
    """
    Retrieve geolocation information for an IP address using ip‑api.com.
    """
    url = f"http://ip-api.com/json/{ip}"
    try:
        response = requests.get(url, timeout=30)
        if response.status_code == 200:
            return response.json()
        print(f"[API] Error: HTTP {response.status_code} for {ip}")
        return None
    except Exception as exc:  # pragma: no cover
        print(f"[API] Exception for {ip}: {exc}")
        return None

Process messages in a loop (stand‑alone snippet)

Back to Blog

Related posts

Read more »