나는 월 50K 메시지를 처리하는 프로덕션 AI 에이전트를 만들었다 - 튜토리얼이 알려주지 않는 것
Source: Dev.to
Introduction
3개월 전, 나는 AI 에이전트를 프로덕션에 배포했다. 오늘은 한 달에 50,000+ 메시지를 다운타임 없이 처리하고 있다. 내가 따라한 튜토리얼은 “hello‑world” 챗봇만 다루었고, 새벽 3시에 사용자가 API를 스팸처럼 호출하거나 LLM이 고객 데이터를 환각하는 등 실제 현장에서 마주치는 문제에 대해서는 전혀 준비시켜 주지 않았다. 여기 실제로 일어난 일들의 이야기다.
What tutorials show you
# The "perfect" AI agent
agent = AIAgent(model="gpt-4")
response = agent.chat("Hello!")
print(response) # Magic! ✨
What production looks like
graph TB
A[User Message] --> B{Rate Limiter}
B -->|Allowed| C[Queue System]
B -->|Blocked| D[429 Response]
C --> E{Health Check}
E -->|Healthy| F[AI Agent]
E -->|Degraded| G[Fallback Handler]
F --> H{Response Validator}
H -->|Valid| I[User]
H -->|Hallucination| J[Retry Logic]
G --> I
J --> F
프로덕션 AI 에이전트는 튜토리얼에서는 전혀 언급되지 않는 여러 보호 계층이 필요하다.
The tutorial (YOLO) approach
while True:
message = get_message()
response = ai_agent.process(message)
The production approach
from collections import defaultdict
from datetime import datetime, timedelta
class AdaptiveRateLimiter:
def __init__(self, base_limit=100):
self.limits = defaultdict(lambda: {"count": 0, "reset": datetime.now()})
self.base_limit = base_limit
def check_limit(self, user_id: str, risk_score: float) -> bool:
"""Adaptive rate limiting based on user behavior"""
limit_data = self.limits[user_id]
# Reset window
if datetime.now() > limit_data["reset"]:
limit_data["count"] = 0
limit_data["reset"] = datetime.now() + timedelta(hours=1)
# Adjust limit based on risk
adjusted_limit = int(self.base_limit * (1 - risk_score))
if limit_data["count"] >= adjusted_limit:
return False
limit_data["count"] += 1
return True
왜 중요한가: 1개월 차에 2,847건의 악용 시도를 차단했으며, 이는 $500 이상의 낭비된 API 호출 비용을 절감한 것이다.
The hallucination problem
사용자가 계좌 잔액을 물었을 때 AI가 이렇게 답했다:
“Your balance is $127,549.32”
실제 잔액은 $47.15였다.
The fix
import re
from typing import Optional
class ResponseValidator:
def __init__(self):
# Patterns that should NEVER appear in responses
self.forbidden_patterns = [
r'\$[\d,]+\.\d{2}', # Dollar amounts
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b', # Emails
]
def validate(self, response: str, user_context: dict) -> Optional[str]:
"""Validate AI response against business rules"""
# Check for forbidden patterns
for pattern in self.forbidden_patterns:
if re.search(pattern, response, re.IGNORECASE):
return None # Reject response
# Verify factual claims
if "balance" in response.lower():
claimed_balance = self.extract_balance(response)
actual_balance = user_context.get("balance")
if claimed_balance and abs(claimed_balance - actual_balance) > 0.01:
return None # Hallucination detected
return response
결과: 프로덕션에서 환각된 금융 데이터 사건이 전혀 발생하지 않았다.
Managing conversation context at scale
from collections import deque
from dataclasses import dataclass
from typing import List
@dataclass
class Message:
role: str
content: str
tokens: int
importance: float # 0‑1 score
class SmartContextManager:
def __init__(self, max_tokens: int = 4000):
self.max_tokens = max_tokens
self.messages = deque()
def add_message(self, message: Message):
self.messages.append(message)
self._trim_context()
def _trim_context(self):
"""Keep most important messages within token limit"""
total_tokens = sum(m.tokens for m in self.messages)
if total_tokens > self.max_tokens and sorted_msgs:
removed = sorted_msgs.pop(0)
self.messages.remove(removed)
total_tokens -= removed.tokens
이 가지치기를 통해 $1,200 / month 정도의 API 비용을 절감할 수 있었다.
Metrics that actually matter
pie
title What Breaks AI Agents in Production
"Rate Limit Abuse": 35
"LLM Timeouts": 25
"Hallucinations": 20
"Network Issues": 15
"Database Locks": 5
Monitoring stack
from dataclasses import dataclass
from datetime import datetime
import logging
from typing import Optional
@dataclass
class AgentMetrics:
timestamp: datetime
response_time_ms: float
tokens_used: int
cost_usd: float
user_satisfaction: float
error_type: Optional[str]
def log(self):
logging.info(
"agent_response",
extra={
"duration_ms": self.response_time_ms,
"tokens": self.tokens_used,
"cost": self.cost_usd,
"satisfaction": self.user_satisfaction,
"error": self.error_type,
},
)
class AgentMonitor:
def __init__(self):
self.metrics = []
self.alerts = {
"high_latency": 2000, # ms
"low_satisfaction": 0.6, # 0‑1
"error_rate": 0.05, # 5%
}
async def track_request(self, request_fn):
start = datetime.now()
error = None
try:
result = await request_fn()
satisfaction = self.calculate_satisfaction(result)
except Exception as e:
error = str(e)
raise
finally:
duration = (datetime.now() - start).total_seconds() * 1000
metric = AgentMetrics(
timestamp=datetime.now(),
response_time_ms=duration,
tokens_used=getattr(result, "tokens", 0),
cost_usd=self.calculate_cost(result),
user_satisfaction=satisfaction if error is None else 0,
error_type=error,
)
metric.log()
self.check_alerts(metric)
Handling provider outages
Bad approach
# Hope and pray
response = openai.ChatCompletion.create(...)
Production approach with fallbacks
from typing import List, Callable
import asyncio
import logging
class AIAgentWithFallbacks:
def __init__(self):
self.providers = [
self.primary_ai, # OpenAI GPT‑4
self.secondary_ai, # Anthropic Claude
self.rule_based, # Template responses
self.human_handoff, # Last resort
]
async def get_response(self, message: str, max_retries: int = 3) -> str:
"""Try providers in order until success"""
for provider in self.providers:
for attempt in range(max_retries):
try:
response = await provider(message)
if self.is_valid_response(response):
return response
except Exception as e:
logging.warning(f"{provider.__name__} failed: {e}")
await asyncio.sleep(2 ** attempt) # exponential backoff
continue
# All providers failed
return (
"I apologize, but I'm having technical difficulties. "
"A human agent will assist you shortly."
)
Stats from production
- Primary provider uptime: 99.2 %
- Fallback triggers: 124 times/month
- User complaints about downtime: 0
The architecture that actually works
(Content continues…)