스냅샷 및 데이터 복구: 벡터 데이터베이스: Qdrant 클러스터
Source: Dev.to
번역할 텍스트가 제공되지 않았습니다. 번역이 필요한 전체 내용을 알려주시면 한국어로 번역해 드리겠습니다.
Overview
It is very important to frequently test the validity of Qdrant snapshots manually.
Vector‑databases are not as robust as traditional SQL databases, so you should never rely solely on automated backups.
The steps below walk you through:
- Creating a snapshot of a collection.
- Restoring a collection from a snapshot.
- Automating snapshot creation with an AWS Lambda function that also copies snapshots to an S3 bucket (or an EFS mount).
1️⃣ Manual Snapshot Workflow
A. 스냅샷 만들기
전체 컬렉션의 스냅샷을 Qdrant의 REST API 또는 Qdrant 대시보드를 통해 트리거할 수 있습니다.
REST API 호출
POST /collections/${collection_name}/snapshots
- 선호하는 REST 클라이언트 또는 Qdrant 대시보드(예:
https://my-custom-alb-domain.mycompany.com/dashboard)를 사용하세요. - 중요:
- 인증을 위해 API‑Key/Token을 포함합니다.
- HTTPS를 사용합니다.
B. 스냅샷 확인
-
스크롤 엔드포인트를 호출하여 포인트 목록(또는 데이터를 반환하는 다른 엔드포인트)을 확인합니다.
POST /collections/${collection_name}/points/scroll -
응답을 임시 로컬 파일에 저장합니다 – 나중에 비교할 기준이 됩니다.
-
새 스냅샷(우리는 이를 snapshot #1 – 베이스라인이라고 부릅니다)이 목록에 나타나는지 확인합니다.
- 스냅샷은 타임스탬프로 이름이 지정되어 쉽게 식별할 수 있습니다.
C. 변경 사항 적용 및 두 번째 스냅샷 만들기
-
컬렉션을 수정합니다(예: 새 포인트 삽입).
# Example for the test_collection PointStruct( id=6, vector=[0.30, 0.05, 0.10, 0.40], payload={"city": "Bengaluru"} ) -
다른 스냅샷을 트리거합니다(수동 또는 예약).
-
스크롤 요청을 다시 실행하고 이전에 저장한 파일과 새 응답을 비교합니다.
-
새 스냅샷(이름은 snapshot #2)이 이제 존재하는지 확인합니다.
FYI: Qdrant는 모든 스냅샷을
/qdrant/snapshots에 저장하지만, 스냅샷을 나열, 다운로드 또는 업로드할 때는 항상 API(/scroll)를 사용해야 합니다.
D. 스냅샷에서 복원
1. 베이스라인(snapshot #1) 복원
PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json
{
"location": "file:///qdrant/storage/snapshots/${collection_name}/${collection_name}-YYYY-MM-DD-HH-MM-SS.snapshot"
}
- 섹션 C에서 만든 변경 사항이 없음을 확인하기 위해 쿼리를 실행합니다.
2. 최신 스냅샷(snapshot #2) 복원
PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json
{
"location": "file:///qdrant/storage/snapshots/${collection_name}/${collection_name}-YYYY-MM-DD-HH-MM-SS.snapshot"
}
- 섹션 C에서 만든 변경 사항이 다시 존재함을 확인하기 위해 쿼리를 실행합니다.
3. S3 위치에서 복원 (예시)
PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json
{
"location": "https://.s3.us-east-2.amazonaws.com/${collection_name}--2025-12-10-20-31-22.snapshot"
}
- 선택한 스냅샷의 상태가 데이터에 반영되는지 확인합니다.
Warning: Fargate 작업(또는 컨테이너) 역할에 S3 읽기/쓰기 권한이 있어야 합니다. 이 요구사항은 본 문서의 범위를 벗어납니다.
2️⃣ Lambda Function – 주기적인 스냅샷 및 S3 복사
다음 Lambda 코드:
- 설정 가능한 일정에 따라 스냅샷을 트리거합니다.
- 각 스냅샷을 S3 버킷에 복사합니다(또는 EFS 파일 시스템을
/qdrant/snapshots/에 마운트하고 다른 리전으로 복제할 수 있습니다).
import os
import json
import boto3
import traceback
from datetime import datetime, timezone
import requests
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.metrics import MetricUnit
# --------------------------------------------------------------
logger = Logger()
tracer = Tracer()
metrics = Metrics()
s3_client = boto3.client('s3')
secrets_client = boto3.client('secretsmanager')
# --------------------------------------------------------------
def get_api_key(secrets_manager_arn: str) -> str:
"""Retrieve API key from AWS Secrets Manager."""
if not secrets_manager_arn or not secrets_manager_arn.strip():
raise ValueError("Secrets Manager ARN not provided")
logger.info(f"Retrieving API key from Secrets Manager ARN: {secrets_manager_arn}")
response = secrets_client.get_secret_value(SecretId=secrets_manager_arn)
api_key = response['SecretString'].strip()
if not api_key:
raise ValueError("API key not found in secret")
logger.info("API key retrieved successfully ✅")
return api_key
# --------------------------------------------------------------
def get_qdrant_url(qdrant_fqdn: str) -> str:
"""Construct Qdrant URL using Service Discovery FQDN."""
if not qdrant_fqdn:
raise ValueError("QDRANT_FQDN environment variable not set")
qdrant_url = f"http://{qdrant_fqdn}:6333"
logger.info(f"Qdrant URL: {qdrant_url}")
return qdrant_url
# --------------------------------------------------------------
def determine_snapshot_frequency() -> str:
"""
Determine which snapshot frequency to use based on current UTC time.
Returns: '15min', 'hourly', 'daily', or 'monthly'
"""
now = datetime.now(timezone.utc)
# Monthly: 1st of month at 8 AM UTC
if now.day == 1 and now.hour == 8 and now.minute == 0:
return "monthly"
# Daily: every day at 2 AM UTC
if now.hour == 2 and now.minute == 0:
return "daily"
# Hourly: at minute 15 of each hour
if now.minute == 15:
return "hourly"
# Default: every 15 minutes
return "15min"
# --------------------------------------------------------------
def get_collections(qdrant_url: str, api_key: str) -> list:
"""Retrieve a list of all collections."""
collections_url = f"{qdrant_url}/collections"
headers = {"api-key": api_key}
logger.info(f"Getting collections from {collections_url}")
response = requests.get(collections_url, headers=headers)
if response.status_code != 200:
raise Exception(
f"Failed to get collections: {response.status_code} - {response.text}"
)
collections_data = response.json()
collections = [
col["name"]
for col in collections_data.get("result", {}).get("collections", [])
]
logger.info(f"Found collections: {collections}")
return collections
# --------------------------------------------------------------
def trigger_snapshot(qdrant_url: str, collection: str, api_key: str) -> dict:
"""Create a snapshot for a specific collection."""
snapshot_url = f"{qdrant_url}/collections/{collection}/snapshots"
headers = {"api-key": api_key}
logger.info(f"Triggering snapshot for collection '{collection}'")
response = requests.post(snapshot_url, headers=headers)
if response.status_code != 200:
raise Exception(
f"Snapshot creation failed: {response.status_code} - {response.text}"
)
logger.info(f"Snapshot response: {response.json()}")
return response.json()
# --------------------------------------------------------------
def copy_snapshot_to_s
3(snapshot_path: str, bucket: str, key: str):
"""Upload a local snapshot file to S3."""
logger.info(f"Uploading snapshot {snapshot_path} to s3://{bucket}/{key}")
s3_client.upload_file(snapshot_path, bucket, key)
logger.info("Upload complete ✅")
# --------------------------------------------------------------
def lambda_handler(event, context):
"""Main Lambda entry point."""
try:
# Environment variables
secrets_arn = os.getenv("SECRETS_MANAGER_ARN")
qdrant_fqdn = os.getenv("QDRANT_FQDN")
s3_bucket = os.getenv("SNAPSHOT_S3_BUCKET")
snapshot_prefix = os.getenv("SNAPSHOT_S3_PREFIX", "qdrant-snapshots/")
# Retrieve configuration
api_key = get_api_key(secrets_arn)
qdrant_url = get_qdrant_url(qdrant_fqdn)
# Determine which collections to snapshot
collections = get_collections(qdrant_url, api_key)
# Loop through collections and snapshot each one
for coll in collections:
snap_resp = trigger_snapshot(qdrant_url, coll, api_key)
# The response contains the snapshot file name
snapshot_file = snap_resp["result"]["name"]
local_path = f"/qdrant/snapshots/{snapshot_file}"
# Optional: wait until the file appears on the filesystem
# (implementation omitted for brevity)
# Copy to S3
s3_key = f"{snapshot_prefix}{snapshot_file}"
copy_snapshot_to_s3(local_path, s3_bucket, s3_key)
# Emit a custom metric
metrics.add_metric(name="SnapshotsCreated", unit=MetricUnit.Count, value=1)
return {
"statusCode": 200,
"body": json.dumps({"message": "Snapshots created and uploaded successfully"})
}
except Exception as e:
logger.exception("Error in snapshot Lambda")
metrics.add_metric(name="SnapshotErrors", unit=MetricUnit.Count, value=1)
return {
"statusCode": 500,
"body": json.dumps({"error": str(e), "trace": traceback.format_exc()})
}
배포 방법
| 단계 | 설명 |
|---|---|
| 1 | Lambda를 패키징합니다 (aws-lambda-powertools와 requests 포함). |
| 2 | Secrets Manager, S3, 그리고 (EFS를 사용하는 경우) EFS 액세스 권한이 있는 IAM 역할을 생성합니다. |
| 3 | 필수 환경 변수(SECRETS_MANAGER_ARN, QDRANT_FQDN, SNAPSHOT_S3_BUCKET, SNAPSHOT_S3_PREFIX)를 설정합니다. |
| 4 | 원하는 일정(예: 15분마다)으로 Lambda를 호출하도록 CloudWatch Events / EventBridge 규칙을 구성합니다. |
| 5 | (선택) Fargate 작업 정의에서 /qdrant/snapshots/에 EFS 파일 시스템을 마운트하고 교차‑리전 복제를 활성화합니다. |
📌 핵심 요점
- 자동 스냅샷에만 의존하지 마세요 – 스냅샷을 복원할 수 있는지 항상 확인하세요.
- 스냅샷 이름에 타임스탬프를 사용해 올바른 버전을 식별하세요.
- Lambda 예시는 스냅샷을 생성하고 S3(또는 EFS)로 백업하는 반복 가능하고 감사 가능한 방법을 제공합니다.
- 컨테이너/Fargate 작업 역할에 필요한 S3(또는 EFS) 권한이 있는지 확인하세요.
스냅샷 즐겁게 사용하세요! 🚀