Snapshots & Data-RESTORE: Vector-Database: Qdrant-Cluster
Source: Dev.to
Overview
It is very important to frequently test the validity of Qdrant snapshots manually.
Vector‑databases are not as robust as traditional SQL databases, so you should never rely solely on automated backups.
The steps below walk you through:
- Creating a snapshot of a collection.
- Restoring a collection from a snapshot.
- Automating snapshot creation with an AWS Lambda function that also copies snapshots to an S3 bucket (or an EFS mount).
1️⃣ Manual Snapshot Workflow
A. Create a Snapshot
You can trigger a snapshot of an entire collection via Qdrant’s REST API or the Qdrant Dashboard.
REST API call
POST /collections/${collection_name}/snapshots
- Use your preferred REST client or the Qdrant Dashboard (e.g.,
https://my-custom-alb-domain.mycompany.com/dashboard). - Important:
- Include the API‑Key/Token for authentication.
- Use HTTPS.
B. Verify the Snapshot
-
Invoke the scroll endpoint to list points (or any other endpoint that returns data).
POST /collections/${collection_name}/points/scroll -
Save the response to a temporary local file – this will be your reference for later comparison.
-
Confirm that a new snapshot (we’ll call it snapshot #1 – the baseline) appears in the list.
- Snapshots are named with a timestamp, making them easy to identify.
C. Make a Change & Create a Second Snapshot
-
Modify the collection (e.g., insert a new point).
# Example for the test_collection PointStruct( id=6, vector=[0.30, 0.05, 0.10, 0.40], payload={"city": "Bengaluru"} ) -
Trigger another snapshot (manual or scheduled).
-
Run the scroll request again and compare the new response with the file you saved earlier.
-
Verify that a new snapshot (named snapshot #2) now exists.
FYI: Qdrant stores all snapshots under
/qdrant/snapshots, but you should always use the API (/scroll) to list, download, or upload snapshots.
D. Restore from a Snapshot
1. Restore the baseline (snapshot #1)
PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json
{
"location": "file:///qdrant/storage/snapshots/${collection_name}/${collection_name}-YYYY-MM-DD-HH-MM-SS.snapshot"
}
- Run a query to confirm that the change made in section C is missing.
2. Restore the latest snapshot (snapshot #2)
PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json
{
"location": "file:///qdrant/storage/snapshots/${collection_name}/${collection_name}-YYYY-MM-DD-HH-MM-SS.snapshot"
}
- Run a query to confirm that the change made in section C is present again.
3. Restore from an S3 location (example)
PUT /collections/${collection_name}/snapshots/recover
Content-Type: application/json
{
"location": "https://.s3.us-east-2.amazonaws.com/${collection_name}--2025-12-10-20-31-22.snapshot"
}
- Verify that the data reflects the state of the chosen snapshot.
Warning: The Fargate task (or container) role must have S3 read/write permissions. This requirement is outside the scope of this article.
2️⃣ Lambda Function – Periodic Snapshot & S3 Copy
The following Lambda code:
- Triggers snapshots on a configurable schedule.
- Copies each snapshot to an S3 bucket (or you can mount an EFS filesystem at
/qdrant/snapshots/and replicate it to another region).
import os
import json
import boto3
import traceback
from datetime import datetime, timezone
import requests
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.metrics import MetricUnit
# --------------------------------------------------------------
logger = Logger()
tracer = Tracer()
metrics = Metrics()
s3_client = boto3.client('s3')
secrets_client = boto3.client('secretsmanager')
# --------------------------------------------------------------
def get_api_key(secrets_manager_arn: str) -> str:
"""Retrieve API key from AWS Secrets Manager."""
if not secrets_manager_arn or not secrets_manager_arn.strip():
raise ValueError("Secrets Manager ARN not provided")
logger.info(f"Retrieving API key from Secrets Manager ARN: {secrets_manager_arn}")
response = secrets_client.get_secret_value(SecretId=secrets_manager_arn)
api_key = response['SecretString'].strip()
if not api_key:
raise ValueError("API key not found in secret")
logger.info("API key retrieved successfully ✅")
return api_key
# --------------------------------------------------------------
def get_qdrant_url(qdrant_fqdn: str) -> str:
"""Construct Qdrant URL using Service Discovery FQDN."""
if not qdrant_fqdn:
raise ValueError("QDRANT_FQDN environment variable not set")
qdrant_url = f"http://{qdrant_fqdn}:6333"
logger.info(f"Qdrant URL: {qdrant_url}")
return qdrant_url
# --------------------------------------------------------------
def determine_snapshot_frequency() -> str:
"""
Determine which snapshot frequency to use based on current UTC time.
Returns: '15min', 'hourly', 'daily', or 'monthly'
"""
now = datetime.now(timezone.utc)
# Monthly: 1st of month at 8 AM UTC
if now.day == 1 and now.hour == 8 and now.minute == 0:
return "monthly"
# Daily: every day at 2 AM UTC
if now.hour == 2 and now.minute == 0:
return "daily"
# Hourly: at minute 15 of each hour
if now.minute == 15:
return "hourly"
# Default: every 15 minutes
return "15min"
# --------------------------------------------------------------
def get_collections(qdrant_url: str, api_key: str) -> list:
"""Retrieve a list of all collections."""
collections_url = f"{qdrant_url}/collections"
headers = {"api-key": api_key}
logger.info(f"Getting collections from {collections_url}")
response = requests.get(collections_url, headers=headers)
if response.status_code != 200:
raise Exception(
f"Failed to get collections: {response.status_code} - {response.text}"
)
collections_data = response.json()
collections = [
col["name"]
for col in collections_data.get("result", {}).get("collections", [])
]
logger.info(f"Found collections: {collections}")
return collections
# --------------------------------------------------------------
def trigger_snapshot(qdrant_url: str, collection: str, api_key: str) -> dict:
"""Create a snapshot for a specific collection."""
snapshot_url = f"{qdrant_url}/collections/{collection}/snapshots"
headers = {"api-key": api_key}
logger.info(f"Triggering snapshot for collection '{collection}'")
response = requests.post(snapshot_url, headers=headers)
if response.status_code != 200:
raise Exception(
f"Snapshot creation failed: {response.status_code} - {response.text}"
)
logger.info(f"Snapshot response: {response.json()}")
return response.json()
# --------------------------------------------------------------
def copy_snapshot_to_s3(snapshot_path: str, bucket: str, key: str):
"""Upload a local snapshot file to S3."""
logger.info(f"Uploading snapshot {snapshot_path} to s3://{bucket}/{key}")
s3_client.upload_file(snapshot_path, bucket, key)
logger.info("Upload complete ✅")
# --------------------------------------------------------------
def lambda_handler(event, context):
"""Main Lambda entry point."""
try:
# Environment variables
secrets_arn = os.getenv("SECRETS_MANAGER_ARN")
qdrant_fqdn = os.getenv("QDRANT_FQDN")
s3_bucket = os.getenv("SNAPSHOT_S3_BUCKET")
snapshot_prefix = os.getenv("SNAPSHOT_S3_PREFIX", "qdrant-snapshots/")
# Retrieve configuration
api_key = get_api_key(secrets_arn)
qdrant_url = get_qdrant_url(qdrant_fqdn)
# Determine which collections to snapshot
collections = get_collections(qdrant_url, api_key)
# Loop through collections and snapshot each one
for coll in collections:
snap_resp = trigger_snapshot(qdrant_url, coll, api_key)
# The response contains the snapshot file name
snapshot_file = snap_resp["result"]["name"]
local_path = f"/qdrant/snapshots/{snapshot_file}"
# Optional: wait until the file appears on the filesystem
# (implementation omitted for brevity)
# Copy to S3
s3_key = f"{snapshot_prefix}{snapshot_file}"
copy_snapshot_to_s3(local_path, s3_bucket, s3_key)
# Emit a custom metric
metrics.add_metric(name="SnapshotsCreated", unit=MetricUnit.Count, value=1)
return {
"statusCode": 200,
"body": json.dumps({"message": "Snapshots created and uploaded successfully"})
}
except Exception as e:
logger.exception("Error in snapshot Lambda")
metrics.add_metric(name="SnapshotErrors", unit=MetricUnit.Count, value=1)
return {
"statusCode": 500,
"body": json.dumps({"error": str(e), "trace": traceback.format_exc()})
}
How to Deploy
| Step | Description |
|---|---|
| 1 | Package the Lambda (including aws-lambda-powertools and requests). |
| 2 | Create an IAM role with permissions for Secrets Manager, S3, and (if you use EFS) EFS access. |
| 3 | Set the required environment variables (SECRETS_MANAGER_ARN, QDRANT_FQDN, SNAPSHOT_S3_BUCKET, SNAPSHOT_S3_PREFIX). |
| 4 | Configure a CloudWatch Events / EventBridge rule to invoke the Lambda on the desired schedule (e.g., every 15 minutes). |
| 5 | (Optional) Mount an EFS filesystem to /qdrant/snapshots/ in the Fargate task definition and enable cross‑region replication. |
📌 Key Takeaways
- Never rely solely on automated snapshots – always verify that a snapshot can be restored.
- Use timestamps in snapshot names to identify the correct version.
- The Lambda example provides a repeatable, auditable way to create snapshots and back them up to S3 (or EFS).
- Ensure the container/Fargate task role has the necessary S3 (or EFS) permissions.
Happy snapshotting! 🚀