快照 & 数据恢复: 向量数据库: Qdrant 集群

发布: (2025年12月31日 GMT+8 11:31)
7 min read
原文: Dev.to

Source: Dev.to

Overview

非常重要的是要经常手动测试 Qdrant 快照的有效性。
向量数据库的可靠性不如传统的 SQL 数据库,因此绝不能仅依赖自动备份。

下面的步骤将带您完成:

  1. 创建 集合的快照。
  2. 从快照恢复 集合。
  3. 使用 AWS Lambda 函数自动创建快照,并将快照复制到 S3 存储桶(或 EFS 挂载点)。

1️⃣ 手动快照工作流

A. 创建快照

您可以通过 Qdrant 的 REST API 或 Qdrant 仪表板触发整个集合的快照。

REST API 调用

POST /collections/${collection_name}/snapshots
  • 使用您喜欢的 REST 客户端 Qdrant 仪表板(例如 https://my-custom-alb-domain.mycompany.com/dashboard)。
  • 重要提示:
    • 包含用于身份验证的 API‑Key/Token
    • 使用 HTTPS

B. 验证快照

  1. 调用 scroll 接口列出点(或任何返回数据的其他接口)。

    POST /collections/${collection_name}/points/scroll
  2. 将响应保存到本地临时文件——这将作为以后比较的参考。

  3. 确认 新快照(我们称之为 snapshot #1 —— 基线)出现在列表中。

    • 快照名称带有时间戳,便于识别。

C. 进行更改并创建第二个快照

  1. 修改集合(例如,插入一个新点)。

    # Example for the test_collection
    PointStruct(
        id=6,
        vector=[0.30, 0.05, 0.10, 0.40],
        payload={"city": "Bengaluru"}
    )
  2. 触发 另一个快照(手动或计划)。

  3. 再次运行 scroll 请求,并将新响应与之前保存的文件进行比较。

  4. 验证 新快照(命名为 snapshot #2)已存在。

FYI: Qdrant 将所有快照存放在 /qdrant/snapshots 下,但您应始终使用 API(/scroll)来列出、下载或上传快照。

D. 从快照恢复

1. 恢复 基线(snapshot #1)

PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json

{
  "location": "file:///qdrant/storage/snapshots/${collection_name}/${collection_name}-YYYY-MM-DD-HH-MM-SS.snapshot"
}
  • 运行查询以确认 C 节 中的更改已 消失

2. 恢复 最新 快照(snapshot #2)

PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json

{
  "location": "file:///qdrant/storage/snapshots/${collection_name}/${collection_name}-YYYY-MM-DD-HH-MM-SS.snapshot"
}
  • 运行查询以确认 C 节 中的更改已 再次出现

3. 从 S3 位置恢复(示例)

PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json

{
  "location": "https://.s3.us-east-2.amazonaws.com/${collection_name}--2025-12-10-20-31-22.snapshot"
}
  • 验证数据是否反映所选快照的状态。

警告: Fargate 任务(或容器)角色必须具备 S3 读写权限。此要求不在本文范围内。

2️⃣ Lambda Function – 定期快照 & S3 复制

以下 Lambda 代码:

  • 在可配置的计划上触发快照。
  • 将每个快照复制到 S3 存储桶(或者您可以在 /qdrant/snapshots/ 挂载 EFS 文件系统并将其复制到另一个区域)。
import os
import json
import boto3
import traceback
from datetime import datetime, timezone

import requests

from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.metrics import MetricUnit

# --------------------------------------------------------------
logger = Logger()
tracer = Tracer()
metrics = Metrics()

s3_client = boto3.client('s3')
secrets_client = boto3.client('secretsmanager')

# --------------------------------------------------------------
def get_api_key(secrets_manager_arn: str) -> str:
    """Retrieve API key from AWS Secrets Manager."""
    if not secrets_manager_arn or not secrets_manager_arn.strip():
        raise ValueError("Secrets Manager ARN not provided")

    logger.info(f"Retrieving API key from Secrets Manager ARN: {secrets_manager_arn}")
    response = secrets_client.get_secret_value(SecretId=secrets_manager_arn)
    api_key = response['SecretString'].strip()

    if not api_key:
        raise ValueError("API key not found in secret")

    logger.info("API key retrieved successfully ✅")
    return api_key

# --------------------------------------------------------------
def get_qdrant_url(qdrant_fqdn: str) -> str:
    """Construct Qdrant URL using Service Discovery FQDN."""
    if not qdrant_fqdn:
        raise ValueError("QDRANT_FQDN environment variable not set")

    qdrant_url = f"http://{qdrant_fqdn}:6333"
    logger.info(f"Qdrant URL: {qdrant_url}")
    return qdrant_url

# --------------------------------------------------------------
def determine_snapshot_frequency() -> str:
    """
    Determine which snapshot frequency to use based on current UTC time.
    Returns: '15min', 'hourly', 'daily', or 'monthly'
    """
    now = datetime.now(timezone.utc)

    # Monthly: 1st of month at 8 AM UTC
    if now.day == 1 and now.hour == 8 and now.minute == 0:
        return "monthly"

    # Daily: every day at 2 AM UTC
    if now.hour == 2 and now.minute == 0:
        return "daily"

    # Hourly: at minute 15 of each hour
    if now.minute == 15:
        return "hourly"

    # Default: every 15 minutes
    return "15min"

# --------------------------------------------------------------
def get_collections(qdrant_url: str, api_key: str) -> list:
    """Retrieve a list of all collections."""
    collections_url = f"{qdrant_url}/collections"
    headers = {"api-key": api_key}

    logger.info(f"Getting collections from {collections_url}")
    response = requests.get(collections_url, headers=headers)

    if response.status_code != 200:
        raise Exception(
            f"Failed to get collections: {response.status_code} - {response.text}"
        )

    collections_data = response.json()
    collections = [
        col["name"]
        for col in collections_data.get("result", {}).get("collections", [])
    ]
    logger.info(f"Found collections: {collections}")
    return collections

# --------------------------------------------------------------
def trigger_snapshot(qdrant_url: str, collection: str, api_key: str) -> dict:
    """Create a snapshot for a specific collection."""
    snapshot_url = f"{qdrant_url}/collections/{collection}/snapshots"
    headers = {"api-key": api_key}
    logger.info(f"Triggering snapshot for collection '{collection}'")
    response = requests.post(snapshot_url, headers=headers)

    if response.status_code != 200:
        raise Exception(
            f"Snapshot creation failed: {response.status_code} - {response.text}"
        )
    logger.info(f"Snapshot response: {response.json()}")
    return response.json()

# --------------------------------------------------------------
def copy_snapshot_to_s
3(snapshot_path: str, bucket: str, key: str):
    """Upload a local snapshot file to S3."""
    logger.info(f"Uploading snapshot {snapshot_path} to s3://{bucket}/{key}")
    s3_client.upload_file(snapshot_path, bucket, key)
    logger.info("Upload complete ✅")

# --------------------------------------------------------------
def lambda_handler(event, context):
    """Main Lambda entry point."""
    try:
        # Environment variables
        secrets_arn = os.getenv("SECRETS_MANAGER_ARN")
        qdrant_fqdn = os.getenv("QDRANT_FQDN")
        s3_bucket = os.getenv("SNAPSHOT_S3_BUCKET")
        snapshot_prefix = os.getenv("SNAPSHOT_S3_PREFIX", "qdrant-snapshots/")

        # Retrieve configuration
        api_key = get_api_key(secrets_arn)
        qdrant_url = get_qdrant_url(qdrant_fqdn)

        # Determine which collections to snapshot
        collections = get_collections(qdrant_url, api_key)

        # Loop through collections and snapshot each one
        for coll in collections:
            snap_resp = trigger_snapshot(qdrant_url, coll, api_key)

            # The response contains the snapshot file name
            snapshot_file = snap_resp["result"]["name"]
            local_path = f"/qdrant/snapshots/{snapshot_file}"

            # Optional: wait until the file appears on the filesystem
            # (implementation omitted for brevity)

            # Copy to S3
            s3_key = f"{snapshot_prefix}{snapshot_file}"
            copy_snapshot_to_s3(local_path, s3_bucket, s3_key)

        # Emit a custom metric
        metrics.add_metric(name="SnapshotsCreated", unit=MetricUnit.Count, value=1)
        return {
            "statusCode": 200,
            "body": json.dumps({"message": "Snapshots created and uploaded successfully"})
        }

    except Exception as e:
        logger.exception("Error in snapshot Lambda")
        metrics.add_metric(name="SnapshotErrors", unit=MetricUnit.Count, value=1)
        return {
            "statusCode": 500,
            "body": json.dumps({"error": str(e), "trace": traceback.format_exc()})
        }

如何部署

步骤描述
1打包 Lambda(包括 aws-lambda-powertoolsrequests)。
2创建具有 Secrets ManagerS3 权限的 IAM 角色,以及(如果使用 EFS)EFS 访问 权限。
3设置所需的环境变量(SECRETS_MANAGER_ARNQDRANT_FQDNSNAPSHOT_S3_BUCKETSNAPSHOT_S3_PREFIX)。
4配置 CloudWatch Events / EventBridge 规则,以在所需的时间表(例如每 15 分钟)触发 Lambda。
5(可选)在 Fargate 任务定义中将 EFS 文件系统挂载到 /qdrant/snapshots/,并启用跨区域复制。

📌 关键要点

  • 永远不要仅依赖自动快照——始终验证快照是否可以恢复。
  • 在快照名称中使用 时间戳 以识别正确的版本。
  • Lambda 示例提供了一种可重复、可审计的方式来创建快照并将其备份到 S3(或 EFS)。
  • 确保 容器/Fargate 任务角色 拥有必要的 S3(或 EFS)权限。

祝快照顺利! 🚀

Back to Blog

相关文章

阅读更多 »