MongoDB에서 FOR UPDATE SKIP LOCKED에 해당하는 기능

발행: (2026년 1월 2일 오전 08:54 GMT+9)
15 min read
원문: Dev.to

Source: Dev.to

MongoDB에서 FOR UPDATE SKIP LOCKED와 동일한 기능 구현하기

PostgreSQL에서는

SELECT * FROM table_name
WHERE condition
FOR UPDATE SKIP LOCKED;

와 같이 잠긴 행을 건너뛰면서 레코드를 락할 수 있습니다.
MongoDB는 아직 FOR UPDATE SKIP LOCKED와 정확히 같은 구문을 제공하지 않지만, 비슷한 동작을 구현할 수 있는 몇 가지 패턴이 있습니다.

1. findOneAndUpdate(또는 findAndModify) 사용

가장 일반적인 방법은 문서를 바로 업데이트하면서 동시에 “잠금” 플래그를 설정하는 것입니다.

const result = await db.collection('jobs').findOneAndUpdate(
  {
    status: 'pending',          // 아직 처리되지 않은 작업
    locked: { $ne: true }       // 현재 잠겨 있지 않은 작업
  },
  {
    $set: { locked: true, lockedAt: new Date() }
  },
  {
    sort: { priority: -1 },    // 우선순위가 높은 작업을 먼저 잡음
    returnDocument: 'after'     // 업데이트된 문서를 반환
  }
);
  • 쿼리 조건locked: { $ne: true }를 넣어 이미 잠긴 문서는 제외합니다.
  • sort 옵션을 사용해 우선순위생성 시간 등 원하는 기준으로 문서를 선택합니다.
  • 업데이트가 성공하면 해당 문서는 즉시 locked: true 로 표시되므로, 다른 워커가 동일한 문서를 잡지 못합니다.

2. TTL 인덱스로 잠금 자동 해제

잠금이 영구적으로 남아 있으면 작업이 영원히 진행되지 않을 수 있습니다.
이를 방지하기 위해 TTL 인덱스를 사용해 일정 시간이 지나면 자동으로 잠금을 해제하도록 할 수 있습니다.

// 1. 잠금 필드에 TTL 인덱스 생성 (예: 5분 후 자동 삭제)
await db.collection('jobs').createIndex(
  { lockedAt: 1 },
  { expireAfterSeconds: 300 }   // 5분
);

위와 같이 설정하면 lockedAt 필드가 5분 이상 존재하면 해당 문서가 자동으로 삭제되거나(필요에 따라 locked 필드만 초기화하도록 별도 스크립트를 작성) 잠금이 해제됩니다.

3. 작업 완료 후 잠금 해제

작업이 끝났을 때는 locked 플래그와 lockedAt 필드를 다시 정리해 줍니다.

await db.collection('jobs').updateOne(
  { _id: result.value._id },
  {
    $set: { status: 'completed' },
    $unset: { locked: "", lockedAt: "" }
  }
);

4. 여러 워커가 동시에 작업을 잡는 경우

동시성 문제를 최소화하려면 원자적인 findOneAndUpdate 호출을 반드시 사용해야 합니다.
MongoDB는 단일 문서에 대한 업데이트를 원자적으로 보장하므로, 위 패턴을 그대로 적용하면 두 개 이상의 워커가 같은 문서를 동시에 잡는 상황을 방지할 수 있습니다.

5. 전체 예시

async function fetchAndLockJob() {
  const job = await db.collection('jobs').findOneAndUpdate(
    {
      status: 'pending',
      locked: { $ne: true }
    },
    {
      $set: { locked: true, lockedAt: new Date() }
    },
    {
      sort: { priority: -1, createdAt: 1 },
      returnDocument: 'after'
    }
  );

  if (!job.value) {
    // 잠긴 작업이 없거나 대기 중인 작업이 없을 때
    return null;
  }

  return job.value;
}

정리

  • MongoDB에는 FOR UPDATE SKIP LOCKED와 정확히 일치하는 구문이 없지만, findOneAndUpdate + 잠금 플래그 조합으로 동일한 효과를 낼 수 있습니다.
  • TTL 인덱스를 활용해 잠금 자동 해제 메커니즘을 구현하면, 워커가 비정상 종료될 경우에도 시스템이 멈추지 않습니다.
  • 항상 원자적인 업데이트를 사용하고, 작업이 끝난 뒤에는 잠금을 반드시 해제하도록 코드를 작성하세요.

이 패턴을 적용하면 PostgreSQL에서 사용하던 FOR UPDATE SKIP LOCKED와 유사한 동시성 제어를 MongoDB에서도 안전하게 구현할 수 있습니다.

Python 예제

#!/usr/bin/env python3
"""
MongoDB “SELECT … FOR UPDATE SKIP LOCKED” demo.

Each worker:
  1. Finds a document that is not locked.
  2. Atomically claims it by setting a lock field.
  3. Calls an external API.
  4. Writes the result and releases the lock.
"""

import os
import random
import string
import time
from datetime import datetime, timedelta

import requests
from pymongo import MongoClient, ReturnDocument

# ----------------------------------------------------------------------
# Configuration
# ----------------------------------------------------------------------
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
DB_NAME = "demo"
COLL_NAME = "ips"

LOCK_TTL_SECONDS = 300          # how long a lock is considered valid
API_URL = "http://ip-api.com/json/"

# ----------------------------------------------------------------------
# Helper functions
# ----------------------------------------------------------------------
def random_ip() -> str:
    """Generate a random IPv4 address."""
    return ".".join(str(random.randint(0, 255)) for _ in range(4))


def init_data(coll, n=100):
    """Insert *n* documents with random IPs if the collection is empty."""
    if coll.estimated_document_count() == 0:
        docs = [{"ip": random_ip(), "processed": False} for _ in range(n)]
        coll.insert_many(docs)
        print(f"Inserted {n} test documents.")
    else:
        print("Collection already contains data; skipping init.")


def claim_document(coll, worker_id: str):
    """
    Atomically claim the first unlocked document.

    The query looks for:
      - documents that are not processed,
      - and either have no lock or have an expired lock.
    """
    now = datetime.utcnow()
    lock_until = now + timedelta(seconds=LOCK_TTL_SECONDS)

    query = {
        "processed": False,
        "$or": [
            {"lock": {"$exists": False}},
            {"lock.until": {"$lt": now}},
        ],
    }

    update = {
        "$set": {
            "lock": {
                "by": worker_id,
                "until": lock_until,
            }
        }
    }

    # findOneAndUpdate returns the *pre‑update* document, i.e. the one we just claimed
    doc = coll.find_one_and_update(
        filter=query,
        update=update,
        return_document=ReturnDocument.BEFORE,
    )
    return doc


def fetch_location(ip: str) -> dict:
    """Call the external IP‑API service and return the JSON payload."""
    try:
        resp = requests.get(f"{API_URL}{ip}", timeout=5)
        resp.raise_for_status()
        return resp.json()
    except Exception as exc:
        print(f"API error for {ip}: {exc}")
        return {}


def finalize_document(coll, doc_id, location: dict):
    """Write the location data and remove the lock."""
    coll.update_one(
        {"_id": doc_id},
        {
            "$set": {
                "location": location,
                "processed": True,
            },
            "$unset": {"lock": ""},
        },
    )


# ----------------------------------------------------------------------
# Main worker loop
# ----------------------------------------------------------------------
def worker():
    client = MongoClient(MONGO_URI)
    coll = client[DB_NAME][COLL_NAME]

    worker_id = f"{os.getpid()}-{''.join(random.choices(string.ascii_lowercase, k=4))}"
    print(f"Worker {worker_id} started.")

    while True:
        doc = claim_document(coll, worker_id)
        if not doc:
            print("No more work – exiting.")
            break

        ip = doc["ip"]
        print(f"[{worker_id}] Processing IP {ip} (doc _id={doc['_id']})")
        location = fetch_location(ip)

        finalize_document(coll, doc["_id"], location)
        print(f"[{worker_id}] Finished IP {ip}")

        # optional back‑off to avoid hammering the API
        time.sleep(random.uniform(0.1, 0.5))


if __name__ == "__main__":
    # Initialise data on first run
    client = MongoClient(MONGO_URI)
collection = client[DB_NAME][COLL_NAME]
init_data(collection, n=200)

# Start the worker (run multiple instances in separate terminals or containers)
worker()

작동 방식

단계MongoDB가 수행하는 작업FOR UPDATE SKIP LOCKED을 모방하는 이유
1. 클레임find_one_and_update가 원자적으로 lock 서브문서(by, until)를 추가합니다.이제 문서는 작업자에게 “소유”되며, 다른 작업자들의 쿼리는 이미 만료되지 않은 잠금이 있는 문서를 필터링합니다.
2. 처리작업자는 외부 API를 트랜잭션 외부에서 호출합니다.MongoDB 내부에서 장시간 실행되는 작업이 잠금을 보유하지 않으므로 교착 상태를 방지하고 낙관적 동시성 모델을 유지합니다.
3. 완료update_one이 결과를 기록하고 $unset으로 잠금을 해제합니다.문서는 다시 다른 작업자에게 보이게 되며(processed: true인 경우 숨겨진 상태를 유지합니다).

여러 워커 실행

여러 터미널을 열거나(또는 컨테이너를 띄워) 스크립트를 동시에 실행합니다:

python3 mongo_skip_locked_demo.py

각 인스턴스는:

  • 잠금되지 않은 문서를 선택하고,
  • LOCK_TTL_SECONDS 동안 잠그고,
  • 처리하고,
  • 잠금을 해제합니다.

워커가 잠금을 해제하기 전에 충돌하면, TTL이 지나 잠금이 만료되고 다른 워커가 동일한 문서를 가져올 수 있습니다—바로 SELECT … FOR UPDATE SKIP LOCKED에서 기대하는 동작과 같습니다.

예시 문서

초기 문서

{
  "_id": { "$oid": "6956e772baea71e37a818e73" },
  "originatingIp": "1.1.1.1",
  "location": null
}

처리 중인 문서

{
  "_id": { "$oid": "6956e772baea71e37a818e73" },
  "originatingIp": "1.1.1.1",
  "location": null,
  "lock": {
    "by": "franck",
    "until": "2026-01-01T22:33:10.833Z"
  }
}

처리 후 문서

{
  "_id": { "$oid": "6956e772baea71e37a818e73" },
  "originatingIp": "1.1.1.1",
  "location": {
    "status": "success",
    "country": "Hong Kong",
    "countryCode": "HK",
    "region": "HCW",
    "regionName": "Central and Western District",
    "city": "Hong Kong",
    "zip": "",
    "lat": 22.3193,
    "lon": 114.1693,
    "timezone": "Asia/Hong_Kong",
    "isp": "Cloudflare, Inc",
    "org": "APNIC and Cloudflare DNS Resolver project",
    "as": "AS13335 Cloudflare, Inc.",
    "query": "1.1.1.1"
  }
}

전체 파이썬 프로그램 (대체 구현)

import os
import random
import socket
import threading
import time
from datetime import datetime, timedelta

import requests
from pymongo import MongoClient

# ----------------------------------------------------------------------
# MongoDB connection and collection
# ----------------------------------------------------------------------
client = MongoClient("mongodb://127.0.0.1:27017/?directConnection=true")
db = client.test
messages = db.message

# ----------------------------------------------------------------------
# Test settings (adjust as needed)
# ----------------------------------------------------------------------
DOCUMENTS = 10          # Number of documents created initially
THREADS   = 5           # Number of worker threads
SECONDS   = 15          # How long each thread loops trying to claim docs

# Worker identity (used for the lock)
WORKER_ID = f"{socket.gethostname()}-{os.getpid()}"
LOCK_DURATION = timedelta(seconds=60)   # Expected max processing time

# Helper to get UTC now
def utcnow() -> datetime:
    return datetime.utcnow()

# Grace period to tolerate clock skew when checking lock expiry
MAX_CLOCK_SKEW = timedelta(seconds=1)

# ----------------------------------------------------------------------
# Prepare test messages (random IPs)
# ----------------------------------------------------------------------
def insert_test_docs() -> None:
    """Drop the collection, create a partial index, and insert test docs."""
    messages.drop()
    # Partial index on the lock field – only unprocessed docs (location: null) are indexed
    messages.create_index(
        [("lock.until", 1)],
        partialFilterExpression={"location": None}
    )

    ips = [
        ".".join(str(random.randint(1, 255)) for _ in range(4))
        for _ in range(DOCUMENTS)
    ]

    docs = [{"originatingIp": ip, "location": None} for ip in ips]

    messages.insert_many(docs)
    print(f"[STARTUP] Inserted {DOCUMENTS} test docs into 'message'")
    for doc in messages.find({}, {"_id": 0, "originatingIp": 1, "location": 1}):
        print(doc)

# ----------------------------------------------------------------------
# Claim a document for processing
# ----------------------------------------------------------------------
def claim_document() -> dict | None:
    """
    Atomically claim a document that is:
      * unprocessed (location == null)
      * not locked, or lock has expired (with a 1‑second grace)
    Returns the claimed document, or None if none are available.
    """
    now = utcnow()
    lock_until = now + LOCK_DURATION

    filter_query = {
        "location": None,
        "$or": [
            {"lock": {"$exists": False}},
            {"lock.until": {"$lt": now - MAX_CLOCK_SKEW}}
        ]
    }

    update = {
        "$set": {
            "lock": {
                "by": WORKER_ID,
                "until": lock_until
            }
        }
    }

    from pymongo import ReturnDocument
    doc = messages.find_one_and_update(
        filter_query,
        update,
        return_document=ReturnDocument.AFTER
    )
    return doc

# ----------------------------------------------------------------------
# Helper: fetch location from ip‑api.com
# ----------------------------------------------------------------------
def fetch_location(ip: str) -> dict:
    """Query ip‑api.com for location data."""
    url = f"http://ip-api.com/json/{ip}"
    resp = requests.get(url, timeout=5)
    resp.raise_for_status()
    return resp.json()

# ----------------------------------------------------------------------
# Process a single document (claim → fetch → update)
# ----------------------------------------------------------------------
def process_document() -> bool:
    """
    Claims a document, fetches its location, and updates it.
    Returns True if a document was processed, False otherwise.
    """
    doc = claim_document()
    if not doc:
        return False

Source:

  ip = doc["originatingIp"]
    try:
        location = fetch_location(ip)
    except Exception as exc:
        print(f"[{WORKER_ID}] Failed to fetch location for {ip}: {exc}")
        messages.update_one(
            {"_id": doc["_id"], "lock.by": WORKER_ID},
            {"$unset": {"lock": ""}}
        )
        return True   # processed (even though it failed)

    now = utcnow()
    result = messages.update_one(
        {
            "_id": doc["_id"],
            "lock.by": WORKER_ID,
            "lock.until": {"$gt": now}
        },
        {
            "$set": {"location": location},
            "$unset": {"lock": ""}
        }
    )
    if result.matched_count == 0:
        print(f"[{WORKER_ID}] Lock lost for {doc['_id']}")
    else:
        print(f"[{WORKER_ID}] Updated document {doc['_id']} with location")
    return True

# ----------------------------------------------------------------------
# Worker thread loop
# ----------------------------------------------------------------------
def worker(stop_event: threading.Event) -> None:
    """Continuously claim and process documents until the stop event is set."""
    while not stop_event.is_set():
        processed = process_document()
        if not processed:
            time.sleep(0.1)

# ----------------------------------------------------------------------
# Main entry point
# ----------------------------------------------------------------------
def main() -> None:
    insert_test_docs()

    stop_event = threading.Event()
    threads = [
        threading.Thread(target=worker, args=(stop_event,), name=f"worker-{i}")
        for i in range(THREADS)
    ]

    for t in threads:
        t.start()

    time.sleep(SECONDS)
    stop_event.set()

    for t in threads:
        t.join()

    print("\n=== FINAL DOCUMENTS ===")
    for doc in messages.find().sort("_id"):
        print(doc)

if __name__ == "__main__":
    main()

데모 실행

  1. 로컬 MongoDB 인스턴스 시작 (기본 mongodb://127.0.0.1:27017).

  2. 의존성 설치:

    pip install pymongo requests
  3. 스크립트 실행:

    python your_script_name.py

프로그램은 다음을 수행합니다:

  • 무작위 IP 주소를 포함하는 테스트 문서 몇 개를 삽입합니다.
  • 문서를 가져와 ip‑api.com에서 지리적 위치를 조회하고, 결과를 MongoDB에 다시 저장하는 여러 워커 스레드를 시작합니다.
  • 설정된 실행 시간이 지나면 모든 문서의 최종 상태를 출력하여 각 문서가 정확히 한 번씩 처리되었는지 확인할 수 있습니다.

요약

  • Lock‑field 패턴은 다중 문서 트랜잭션 없이 SELECT … FOR UPDATE SKIP LOCKED 스타일의 의미를 제공합니다.
  • Partial 인덱스는 대규모 컬렉션에서도 클레임 쿼리를 빠르게 유지합니다.
  • Expiration 타임스탬프는 워커 충돌이나 네트워크 문제에 시스템이 견딜 수 있게 합니다.
  • 장시간 실행되는 트랜잭션을 피하면 외부 서비스 대기 중에 데이터베이스가 락을 잡고 있는 것을 방지합니다.

패턴을 자유롭게 조정하세요(다른 lock 필드, lock 기간, 재시도 로직 등)하여 워크로드에 맞추세요.

문서 클레임 (독립형 스니펫)

# Get the current time and compute the lock expiry
now = utcnow()
lock_expiry = now + LOCK_DURATION
token = random.randint(1000, 9999)          # unique lock token for extra safety

# Atomic lock claim: match unlocked documents or locks that have expired
result = messages.update_one(
    {
        "$and": [
            {"location": None},
            {
                "$or": [
                    {"lock": {"$exists": False}},
                    {"lock.until": {"$lt": now - MAX_CLOCK_SKEW}},
                ]
            },
        ]
    },
    {"$set": {"lock": {"by": WORKER_ID, "until": lock_expiry, "token": token}}}
)

if result.modified_count == 0:
    return None

doc = messages.find_one(
    {"lock.by": WORKER_ID, "lock.until": lock_expiry, "lock.token": token}
)

if doc:
    print(f"[{WORKER_ID}] claimed IP {doc['originatingIp']} with token={token}")
else:
    print(f"[{WORKER_ID}] claim succeeded but fetch failed — possible race?")
return doc

공개 API 호출 (독립형 스니펫)

import requests
from typing import Optional, Dict, Any

def fetch_location(ip: str) -> Optional[Dict[str, Any]]:
    """
    ip‑api.com을 사용하여 IP 주소에 대한 지리 위치 정보를 가져옵니다.
    """
    url = f"http://ip-api.com/json/{ip}"
    try:
        response = requests.get(url, timeout=30)
        if response.status_code == 200:
            return response.json()
        print(f"[API] Error: HTTP {response.status_code} for {ip}")
        return None
    except Exception as exc:  # pragma: no cover
        print(f"[API] Exception for {ip}: {exc}")
        return None

루프에서 메시지 처리 (독립형 스니펫)

Back to Blog

관련 글

더 보기 »