멀티스레드 파이썬 앱에서 SQLite 레이스 컨디션을 없앴습니다 🚀

발행: (2026년 2월 1일 오후 11:59 GMT+9)
8 min read
원문: Dev.to

Source: Dev.to

위에 제공된 소스 링크만으로는 번역할 본문 내용이 포함되어 있지 않습니다. 번역을 원하는 전체 텍스트를 제공해 주시면, 요청하신 대로 마크다운 형식과 코드 블록을 유지하면서 한국어로 번역해 드리겠습니다.

소개

무작위 충돌, 데이터베이스 손상, 그리고 “데이터베이스가 잠겨 있습니다” 오류가 여러 스레드가 동시에 SQLite에 접근할 때 내 앱 Sortify를 괴롭혔습니다. 이 글에서는 이러한 레이스 컨디션을 어떻게 제거하고 데이터베이스를 프로덕션‑준비 상태로 만들었는지 설명합니다.

문제

  • 단일 SQLite 연결이 여러 동시 구성 요소 간에 공유되었습니다:

    • Auto‑sort watcher
    • Manual file operations
    • Scheduler tasks
    • Background processing threads
  • 증상:

    • 무작위 충돌
    • database is locked 오류
    • 일관성 없는 히스토리 데이터
    • 데이터베이스 손상 위험
    • 부하가 걸릴 때 전반적인 앱 불안정

문제의 원인은 다음 코드였습니다:

sqlite3.connect(db_path, check_same_thread=False)

check_same_thread=False는 SQLite의 내장 스레드 안전성을 비활성화하지만 실제로 연결을 스레드‑안전하게 만들지는 않습니다. SQLite는 여러 연결을 허용하지만 각 연결은 생성된 스레드 내에서만 사용되어야 합니다. 커서, 연결 또는 트랜잭션을 스레드 간에 공유하면 경쟁 상태가 발생합니다.

솔루션 아키텍처

스레드마다 자체적인 안전한 연결을 제공하고, 자동 재시도 로직과 중앙 집중식 접근 레이어를 갖춘 스레드‑로컬 데이터베이스 관리자를 도입했습니다.

스레드‑로컬 연결

# core/database_manager.py
import threading
import sqlite3
import time
from typing import Callable, Any, List

class DatabaseManager:
    def __init__(self, db_path: str, timeout: float = 10.0):
        self.db_path = db_path
        self.timeout = timeout
        self._local = threading.local()

    def _get_connection(self) -> sqlite3.Connection:
        """Create or retrieve the connection bound to the current thread."""
        if not hasattr(self._local, "conn"):
            self._local.conn = sqlite3.connect(
                self.db_path,
                timeout=self.timeout,
                check_same_thread=True  # ✅ safe
            )
        return self._local.conn

    def execute(self, query: str, params: tuple = ()) -> sqlite3.Cursor:
        """Execute a single statement with automatic retry on lock."""
        return self._retry(lambda: self._get_connection().execute(query, params))

    def execute_transaction(self, operations: List[Callable[[sqlite3.Connection], Any]]) -> None:
        """Run a series of operations atomically."""
        conn = self._get_connection()
        try:
            conn.execute("BEGIN")
            for op in operations:
                op(conn)
            conn.commit()
        except Exception:
            conn.rollback()
            raise

    def close_all_connections(self) -> None:
        """Close the connection for the current thread."""
        if hasattr(self._local, "conn"):
            self._local.conn.close()
            del self._local.conn

    # ---- internal helpers -------------------------------------------------
    def _retry(self, func: Callable[[], Any], retries: int = 5, backoff: float = 0.1) -> Any:
        """Retry a DB operation when the database is locked."""
        for attempt in range(retries):
            try:
                return func()
            except sqlite3.OperationalError as e:
                if "database is locked" not in str(e):
                    raise
                time.sleep(backoff * (2 ** attempt))
        raise sqlite3.OperationalError("Maximum retries exceeded: database is locked")

작동 방식

  • 스레드‑로컬 스토리지(threading.local())를 사용해 각 스레드가 자체 sqlite3.Connection을 갖도록 보장합니다.
  • 연결은 필요 시 생성되며 동일한 스레드 내에서 재사용됩니다.
  • check_same_thread=True는 SQLite의 안전 보장을 강제합니다.
  • execute 메서드는 database is locked 오류가 발생하면 지수 백오프를 적용해 자동으로 재시도합니다.
  • execute_transaction은 높은 부하 상황에서도 원자적인 쓰기를 보장합니다.
  • close_all_connections는 파일 핸들 누수를 방지합니다.

기존 코드 리팩터링

# Example usage in other modules
from .database_manager import DatabaseManager

db_manager = DatabaseManager("/path/to/sortify.db")

# Simple query
cursor = db_manager.execute("SELECT * FROM history WHERE id = ?", (record_id,))
rows = cursor.fetchall()

# Transactional update
def delete_history(conn):
    conn.execute("DELETE FROM history WHERE user_id = ?", (user_id,))

db_manager.execute_transaction([delete_history])

이제 모든 데이터베이스 작업이 하나의 안전한 게이트웨이를 통해 이루어지므로, 숨겨진 레이스 컨디션이 사라집니다.

스트레스 테스트

스레드 안전성을 검증하기 위해 스트레스‑테스트 스위트를 작성했습니다:

# tests/test_database_threading.py
import threading
from core.database_manager import DatabaseManager

def worker(db_path, ops):
    mgr = DatabaseManager(db_path)
    for i in range(ops):
        mgr.execute("INSERT INTO test (value) VALUES (?)", (i,))

def test_threaded_writes():
    db_path = "test.db"
    threads = []
    for _ in range(5):
        t = threading.Thread(target=worker, args=(db_path, 50))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    # Verify results
    mgr = DatabaseManager(db_path)
    count = mgr.execute("SELECT COUNT(*) FROM test").fetchone()[0]
    assert count == 250

결과

메트릭
시도된 총 작업 수250
성공한 작업 수250
실패한 작업 수0
사용된 고유 연결 수5 (스레드당 하나)
데이터베이스 손상없음

테스트 결과 실패 0건, 잠금 0건, 손상 없음을 확인했습니다.

파일 변경 개요

파일설명
core/database_manager.py새로운 스레드‑안전 DB 레이어
core/history.py모든 쿼리를 DatabaseManager 사용으로 마이그레이션
ui/main_window.py직접 DB 접근을 제거하고 이제 매니저 사용
tests/test_database_threading.py동시 쓰기를 위한 스트레스‑테스트 스위트

Key Takeaways

  • SQLite는 스레드 친화적이지만 스레드 안전하지 않다.
  • check_same_thread=False는 흔한 함정이다; 자체 잠금을 구현하지 않는 한 피하라.
  • 올바른 모델은 스레드당 하나의 연결이다.
  • 매니저를 통해 DB 접근을 중앙화하면 향후 버그를 방지하고 유지보수가 간소화된다.
  • 실제 동시성을 시뮬레이션하는 스트레스 테스트가 필수이며, 단위 테스트만으로는 경쟁 조건을 드러내지 못한다.

저장소

전체 구현은 GitHub에서 확인할 수 있습니다:

https://github.com/Mrtracker-new/Sortify

결론

Python 애플리케이션이 SQLite를 사용하고 백그라운드 스레드를 실행한다면, 위 패턴은 무작위 충돌, 데이터베이스 잠금 오류 및 데이터 손상을 방지해 줍니다. 스레드‑로컬 DatabaseManager를 구현하고, 재시도 로직을 추가하며, 모든 DB 상호작용을 중앙 집중화하여 안정적이고 프로덕션‑준비된 솔루션을 만들세요. 즐거운 코딩 되세요! 🚀

Back to Blog

관련 글

더 보기 »

내 2026 개발자 포트폴리오

소개 안녕하세요! 저는 Python, AI, web development 분야에 열정적인 개발자이자 교육자인 Ahmed Anter Elsayed입니다. Live Portfolio 제 Live Portfolio를 확인해 보세요.

Java 가상 스레드 — 빠른 가이드

Java Virtual Threads — 빠른 가이드 Java 21+ · Spring Boot 3.2+ · Project Loom Java Virtual Threads에 대한 간결하고 실무 중심의 가이드 — 무엇이며, 어떻게…