Anthropic, 비동기 멀티테넌트 AI 처리를 위한 배치 API 공개: 클로드 비용 50% 절감, 기능 응답성 유지
출처: Dev.to
비동기 멀티 테넌트 AI 처리를 위한 Anthropic Batch API: 기능 응답성을 유지하면서 Claude 비용을 50% 절감
솔직히 말씀드리겠습니다. Claude를 사용하는 SaaS를 운영 중인데 아직 Batch API를 도입하지 않았다면, 여러분은 돈을 피처럼 흘려보내고 있는 겁니다. 은유가 아니라 문자 그대로, 다음 60초 안에 응답이 필요 없는 모든 요청에 대해 토큰당 비용이 50% 절감됩니다.
CitizenApp에서는 매일 수천 건의 문서 요약, 규정 분류, 정책 분석을 처리합니다. 처음엔 동기 방식이 직관적이었습니다: 사용자가 문서를 업로드 → Claude가 실시간으로 응답 → 대시보드가 업데이트. 반응성이 좋았지만 현금도 불태우는 느낌이었습니다.
그때 깨달았습니다: 요청의 80%는 실제로 동기 응답이 필요하지 않다는 것을. 사용자가 문서 요약을 요청하면 5~30분 정도 기다려도 괜찮습니다. 테넌트가 500개의 정책을 일괄 분류하고 싶다면, 이는 명백히 비동기 작업입니다. Batch API는 바로 이런 경우를 위해 설계됐으며, 현재는 심각하게 활용도가 낮습니다.
아래는 우리가 CitizenApp에 적용한 방법이며, 여러분도 꼭 적용해 보시길 권합니다.
핵심 포인트
- 50% 비용 절감은 눈에 띄는 헤드라인이지만, 진짜 승리는 아키텍처에 있습니다. 배치를 도입하면 다음과 같이 관심사를 명확히 구분하게 됩니다.
- 동기 요청(빠르고 비쌈): 실시간 채팅, 문서 Q&A, 즉각적인 피드백 루프.
- 비동기 요청(느리지만 저렴함): 대량 작업, 예약 분석, 백그라운드 데이터 강화.
이 구분은 건강한 설계입니다. 모든 것을 Claude의 동기 API에 집어넣는 대신, 무엇이 급한지 고민하게 됩니다.
CitizenApp에서는 월 Claude 비용을 약 8천 달러에서 4천 달러 수준으로 절반으로 낮추면서도 기능 커버리지는 오히려 확대했습니다. 핵심은 전체 작업량의 **65%**를 배치 처리로 전환한 것이었습니다.
흐름도
User Request
↓
FastAPI Endpoint (Validate, Enqueue)
↓
PostgreSQL Queue Table
↓
Batch Processor (reads queue, submits to Anthropic)
↓
Anthropic Batch Job (runs in background)
↓
Webhook/Polling Handler (gets results)
↓
PostgreSQL LISTEN/NOTIFY
↓
WebSocket → React 19 Dashboard (real-time update)
구현해 보기
models.py
from sqlalchemy import Column, String, Integer, Text, DateTime, Enum, ForeignKey
from sqlalchemy.orm import declarative_base
from datetime import datetime
import enum
Base = declarative_base()
class AIJobStatus(str, enum.Enum):
QUEUED = "queued"
SUBMITTED = "submitted"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class AIJob(Base):
__tablename__ = "ai_jobs"
id = Column(String, primary_key=True)
tenant_id = Column(String, ForeignKey("tenants.id"), nullable=False, index=True)
user_id = Column(String, nullable=False)
job_type = Column(String, nullable=False) # "summarize", "classify", etc.
status = Column(Enum(AIJobStatus), default=AIJobStatus.QUEUED, index=True)
input_data = Column(Text, nullable=False) # JSON stringified
result = Column(Text, nullable=True) # Result from Claude
batch_id = Column(String, nullable=True, index=True) # Anthropic batch ID
request_id = Column(String, nullable=True) # Within batch
created_at = Column(DateTime, default=datetime.utcnow, index=True)
completed_at = Column(DateTime, nullable=True)
error_message = Column(Text, nullable=True)
api/ai.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import uuid
import json
from sqlalchemy.orm import Session
from database import get_db
from models import AIJob, AIJobStatus
router = APIRouter()
class SummarizeRequest(BaseModel):
document_text: str
max_length: int = 500
@router.post("/api/ai/summarize")
async def queue_summarize(
request: SummarizeRequest,
db: Session = Depends(get_db),
tenant_id: str = Depends(get_tenant_id),
user_id: str = Depends(get_user_id),
):
"""
문서 요약을 비동기로 큐에 넣습니다.
즉시 작업 ID를 반환합니다.
배치가 완료되면 WebSocket을 통해 결과를 전달합니다.
"""
job_id = str(uuid.uuid4())
job = AIJob(
id=job_id,
tenant_id=tenant_id,
user_id=user_id,
job_type="summarize",
input_data=json.dumps({
"document_text": request.document_text,
"max_length": request.max_length,
}),
status=AIJobStatus.QUEUED,
)
db.add(job)
db.commit()
return {
"job_id": job_id,
"status": "queued",
"message": "요청이 큐에 들어갔습니다. 5~30분 안에 결과를 받아보실 수 있습니다.",
}
배치 워커 (예시: APScheduler)
# workers/batch_processor.py
from anthropic import Anthropic
from sqlalchemy.orm import Session
from database import SessionLocal
from models import AIJob, AIJobStatus
import json
from datetime import datetime
client = Anthropic()
def process_batch_jobs():
"""
5분마다 실행 (APScheduler 사용).
대기 중인 작업을 모아 Anthropic Batch API에 제출합니다.
"""
db = SessionLocal()
# 대기 중인 작업을 최대 100개(배치당 10k 요청 제한) 가져오기
queued_jobs = db.query(AIJob).filter(
AIJob.status == AIJobStatus.QUEUED
).limit(100).all()
if not queued_jobs:
db.close()
return
# 배치 요청 구성
requests = []
job_map = {}
for job in queued_jobs:
input_data = json.loads(job.input_data)
if job.job_type == "summarize":
message = f"다음 문서를 {input_data['max_length']} 단어 이내로 요약해 주세요:\n\n{input_data['document_text']}"
elif job.job_type == "classify":
message = f"다음 텍스트를 {input_data['categories']} 중 하나로 분류해 주세요:\n\nText: {input_data['text']}"
else:
continue
request_id = job.id
job_map[request_id] = job.id
requests.append({
"custom_id": request_id,
"params": {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1024,
"messages": [{"role": "user", "content": message}],
},
})
# Anthropic에 배치 제출
batch = client.beta.messages.batches.create(
requests=requests,
betas=["batch-2024-09-24"],
)
# 모든 작업을 'submitted' 상태로 전환
for job in queued_jobs:
job.status = AIJobStatus.SUBMITTED
job.batch_id = batch.id
db.commit()
print(f"Batch {batch.id} 제출 완료 (요청 수: {len(requests)})")
db.close()
def poll_batch_results():
"""
30초마다 실행.
제출된 배치를 확인하고 결과를 저장합니다.
"""
db = SessionLocal()
# 제출된 작업 모두 조회
submitted_jobs = db.query(AIJob).filter(
AIJob.status == AIJobStatus.SUBMITTED
).all()
batch_ids = set(job.batch_id for job in submitted_jobs)
for batch_id in batch_ids:
batch = client.beta.messages.batches.retrieve(batch_id, betas=["batch-2024-09-24"])
if batch.processing_status == "in_progress":
continue
if batch.processing_status == "expired":
# 배치가 만료된 경우 모두 실패 처리
for job in submitted_jobs:
if job.batch_id == batch_id:
job.status = AIJobStatus.FAILED
job.error_message = "Batch expired"
db.commit()
continue
# 배치 완료 → 결과 가져오기
results = client.beta.messages.batches.results(batch_id, betas=["batch-2024-09-24"])
for result in results:
request_id = result.custom_id
job = db.query(AIJob).filter(AIJob.id == request_id).first()
if not job:
continue
if result.result.type == "succeeded":
job.status = AIJobStatus.COMPLETED
job.result = result.result.message.content[0].text
elif result.result.type == "failed":
job.status = AIJobStatus.FAILED
job.error_message = result.result.error.message
db.commit()
db.close()
위 코드를 기반으로 FastAPI 엔드포인트 → PostgreSQL 큐 → 배치 프로세서 → Anthropic 배치 → 결과 폴링 → WebSocket 흐름을 구현하면, 비용을 크게 절감하면서도 사용자에게는 여전히 실시간 업데이트(웹소켓) 경험을 제공할 수 있습니다.
시작해 보세요—비용 절감과 아키텍처 정리가 동시에 이루어집니다!