快照 & 数据恢复: 向量数据库: Qdrant 集群
Source: Dev.to
Overview
非常重要的是要经常手动测试 Qdrant 快照的有效性。
向量数据库的可靠性不如传统的 SQL 数据库,因此绝不能仅依赖自动备份。
下面的步骤将带您完成:
- 创建 集合的快照。
- 从快照恢复 集合。
- 使用 AWS Lambda 函数自动创建快照,并将快照复制到 S3 存储桶(或 EFS 挂载点)。
1️⃣ 手动快照工作流
A. 创建快照
您可以通过 Qdrant 的 REST API 或 Qdrant 仪表板触发整个集合的快照。
REST API 调用
POST /collections/${collection_name}/snapshots
- 使用您喜欢的 REST 客户端 或 Qdrant 仪表板(例如
https://my-custom-alb-domain.mycompany.com/dashboard)。 - 重要提示:
- 包含用于身份验证的 API‑Key/Token。
- 使用 HTTPS。
B. 验证快照
-
调用 scroll 接口列出点(或任何返回数据的其他接口)。
POST /collections/${collection_name}/points/scroll -
将响应保存到本地临时文件——这将作为以后比较的参考。
-
确认 新快照(我们称之为 snapshot #1 —— 基线)出现在列表中。
- 快照名称带有时间戳,便于识别。
C. 进行更改并创建第二个快照
-
修改集合(例如,插入一个新点)。
# Example for the test_collection PointStruct( id=6, vector=[0.30, 0.05, 0.10, 0.40], payload={"city": "Bengaluru"} ) -
触发 另一个快照(手动或计划)。
-
再次运行 scroll 请求,并将新响应与之前保存的文件进行比较。
-
验证 新快照(命名为 snapshot #2)已存在。
FYI: Qdrant 将所有快照存放在
/qdrant/snapshots下,但您应始终使用 API(/scroll)来列出、下载或上传快照。
D. 从快照恢复
1. 恢复 基线(snapshot #1)
PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json
{
"location": "file:///qdrant/storage/snapshots/${collection_name}/${collection_name}-YYYY-MM-DD-HH-MM-SS.snapshot"
}
- 运行查询以确认 C 节 中的更改已 消失。
2. 恢复 最新 快照(snapshot #2)
PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json
{
"location": "file:///qdrant/storage/snapshots/${collection_name}/${collection_name}-YYYY-MM-DD-HH-MM-SS.snapshot"
}
- 运行查询以确认 C 节 中的更改已 再次出现。
3. 从 S3 位置恢复(示例)
PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json
{
"location": "https://.s3.us-east-2.amazonaws.com/${collection_name}--2025-12-10-20-31-22.snapshot"
}
- 验证数据是否反映所选快照的状态。
警告: Fargate 任务(或容器)角色必须具备 S3 读写权限。此要求不在本文范围内。
2️⃣ Lambda Function – 定期快照 & S3 复制
以下 Lambda 代码:
- 在可配置的计划上触发快照。
- 将每个快照复制到 S3 存储桶(或者您可以在
/qdrant/snapshots/挂载 EFS 文件系统并将其复制到另一个区域)。
import os
import json
import boto3
import traceback
from datetime import datetime, timezone
import requests
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.metrics import MetricUnit
# --------------------------------------------------------------
logger = Logger()
tracer = Tracer()
metrics = Metrics()
s3_client = boto3.client('s3')
secrets_client = boto3.client('secretsmanager')
# --------------------------------------------------------------
def get_api_key(secrets_manager_arn: str) -> str:
"""Retrieve API key from AWS Secrets Manager."""
if not secrets_manager_arn or not secrets_manager_arn.strip():
raise ValueError("Secrets Manager ARN not provided")
logger.info(f"Retrieving API key from Secrets Manager ARN: {secrets_manager_arn}")
response = secrets_client.get_secret_value(SecretId=secrets_manager_arn)
api_key = response['SecretString'].strip()
if not api_key:
raise ValueError("API key not found in secret")
logger.info("API key retrieved successfully ✅")
return api_key
# --------------------------------------------------------------
def get_qdrant_url(qdrant_fqdn: str) -> str:
"""Construct Qdrant URL using Service Discovery FQDN."""
if not qdrant_fqdn:
raise ValueError("QDRANT_FQDN environment variable not set")
qdrant_url = f"http://{qdrant_fqdn}:6333"
logger.info(f"Qdrant URL: {qdrant_url}")
return qdrant_url
# --------------------------------------------------------------
def determine_snapshot_frequency() -> str:
"""
Determine which snapshot frequency to use based on current UTC time.
Returns: '15min', 'hourly', 'daily', or 'monthly'
"""
now = datetime.now(timezone.utc)
# Monthly: 1st of month at 8 AM UTC
if now.day == 1 and now.hour == 8 and now.minute == 0:
return "monthly"
# Daily: every day at 2 AM UTC
if now.hour == 2 and now.minute == 0:
return "daily"
# Hourly: at minute 15 of each hour
if now.minute == 15:
return "hourly"
# Default: every 15 minutes
return "15min"
# --------------------------------------------------------------
def get_collections(qdrant_url: str, api_key: str) -> list:
"""Retrieve a list of all collections."""
collections_url = f"{qdrant_url}/collections"
headers = {"api-key": api_key}
logger.info(f"Getting collections from {collections_url}")
response = requests.get(collections_url, headers=headers)
if response.status_code != 200:
raise Exception(
f"Failed to get collections: {response.status_code} - {response.text}"
)
collections_data = response.json()
collections = [
col["name"]
for col in collections_data.get("result", {}).get("collections", [])
]
logger.info(f"Found collections: {collections}")
return collections
# --------------------------------------------------------------
def trigger_snapshot(qdrant_url: str, collection: str, api_key: str) -> dict:
"""Create a snapshot for a specific collection."""
snapshot_url = f"{qdrant_url}/collections/{collection}/snapshots"
headers = {"api-key": api_key}
logger.info(f"Triggering snapshot for collection '{collection}'")
response = requests.post(snapshot_url, headers=headers)
if response.status_code != 200:
raise Exception(
f"Snapshot creation failed: {response.status_code} - {response.text}"
)
logger.info(f"Snapshot response: {response.json()}")
return response.json()
# --------------------------------------------------------------
def copy_snapshot_to_s
3(snapshot_path: str, bucket: str, key: str):
"""Upload a local snapshot file to S3."""
logger.info(f"Uploading snapshot {snapshot_path} to s3://{bucket}/{key}")
s3_client.upload_file(snapshot_path, bucket, key)
logger.info("Upload complete ✅")
# --------------------------------------------------------------
def lambda_handler(event, context):
"""Main Lambda entry point."""
try:
# Environment variables
secrets_arn = os.getenv("SECRETS_MANAGER_ARN")
qdrant_fqdn = os.getenv("QDRANT_FQDN")
s3_bucket = os.getenv("SNAPSHOT_S3_BUCKET")
snapshot_prefix = os.getenv("SNAPSHOT_S3_PREFIX", "qdrant-snapshots/")
# Retrieve configuration
api_key = get_api_key(secrets_arn)
qdrant_url = get_qdrant_url(qdrant_fqdn)
# Determine which collections to snapshot
collections = get_collections(qdrant_url, api_key)
# Loop through collections and snapshot each one
for coll in collections:
snap_resp = trigger_snapshot(qdrant_url, coll, api_key)
# The response contains the snapshot file name
snapshot_file = snap_resp["result"]["name"]
local_path = f"/qdrant/snapshots/{snapshot_file}"
# Optional: wait until the file appears on the filesystem
# (implementation omitted for brevity)
# Copy to S3
s3_key = f"{snapshot_prefix}{snapshot_file}"
copy_snapshot_to_s3(local_path, s3_bucket, s3_key)
# Emit a custom metric
metrics.add_metric(name="SnapshotsCreated", unit=MetricUnit.Count, value=1)
return {
"statusCode": 200,
"body": json.dumps({"message": "Snapshots created and uploaded successfully"})
}
except Exception as e:
logger.exception("Error in snapshot Lambda")
metrics.add_metric(name="SnapshotErrors", unit=MetricUnit.Count, value=1)
return {
"statusCode": 500,
"body": json.dumps({"error": str(e), "trace": traceback.format_exc()})
}
如何部署
| 步骤 | 描述 |
|---|---|
| 1 | 打包 Lambda(包括 aws-lambda-powertools 和 requests)。 |
| 2 | 创建具有 Secrets Manager、S3 权限的 IAM 角色,以及(如果使用 EFS)EFS 访问 权限。 |
| 3 | 设置所需的环境变量(SECRETS_MANAGER_ARN、QDRANT_FQDN、SNAPSHOT_S3_BUCKET、SNAPSHOT_S3_PREFIX)。 |
| 4 | 配置 CloudWatch Events / EventBridge 规则,以在所需的时间表(例如每 15 分钟)触发 Lambda。 |
| 5 | (可选)在 Fargate 任务定义中将 EFS 文件系统挂载到 /qdrant/snapshots/,并启用跨区域复制。 |
📌 关键要点
- 永远不要仅依赖自动快照——始终验证快照是否可以恢复。
- 在快照名称中使用 时间戳 以识别正确的版本。
- Lambda 示例提供了一种可重复、可审计的方式来创建快照并将其备份到 S3(或 EFS)。
- 确保 容器/Fargate 任务角色 拥有必要的 S3(或 EFS)权限。
祝快照顺利! 🚀