我构建了处理每月5万条消息的生产级 AI 代理——教程不会告诉你的事
发布: (2025年12月16日 GMT+8 05:54)
5 min read
原文: Dev.to
Source: Dev.to
介绍
三个月前,我把一个 AI 代理部署到生产环境。如今它每月处理 50,000+ 条消息,且零宕机。我的学习教程只教了一个 “hello‑world” 聊天机器人;它们从未让我准备好面对真实世界的问题,例如用户在凌晨 3 点刷 API,或大模型捏造客户数据。下面就是实际发生的故事。
教程里展示的内容
# The "perfect" AI agent
agent = AIAgent(model="gpt-4")
response = agent.chat("Hello!")
print(response) # Magic! ✨
生产环境的实际情况
graph TB
A[User Message] --> B{Rate Limiter}
B -->|Allowed| C[Queue System]
B -->|Blocked| D[429 Response]
C --> E{Health Check}
E -->|Healthy| F[AI Agent]
E -->|Degraded| G[Fallback Handler]
F --> H{Response Validator}
H -->|Valid| I[User]
H -->|Hallucination| J[Retry Logic]
G --> I
J --> F
生产中的 AI 代理需要多层防护,而这些在教程里从未提及。
教程(YOLO)式的做法
while True:
message = get_message()
response = ai_agent.process(message)
生产环境的做法
from collections import defaultdict
from datetime import datetime, timedelta
class AdaptiveRateLimiter:
def __init__(self, base_limit=100):
self.limits = defaultdict(lambda: {"count": 0, "reset": datetime.now()})
self.base_limit = base_limit
def check_limit(self, user_id: str, risk_score: float) -> bool:
"""Adaptive rate limiting based on user behavior"""
limit_data = self.limits[user_id]
# Reset window
if datetime.now() > limit_data["reset"]:
limit_data["count"] = 0
limit_data["reset"] = datetime.now() + timedelta(hours=1)
# Adjust limit based on risk
adjusted_limit = int(self.base_limit * (1 - risk_score))
if limit_data["count"] >= adjusted_limit:
return False
limit_data["count"] += 1
return True
为什么重要: 第一个月我拦截了 2,847 次滥用尝试,节省了超过 $500 的无效 API 调用费用。
捏造(幻觉)问题
一位用户询问账户余额,AI 回答:
“Your balance is $127,549.32”
实际余额只有 $47.15。
解决方案
import re
from typing import Optional
class ResponseValidator:
def __init__(self):
# Patterns that should NEVER appear in responses
self.forbidden_patterns = [
r'\$[\d,]+\.\d{2}', # Dollar amounts
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b', # Emails
]
def validate(self, response: str, user_context: dict) -> Optional[str]:
"""Validate AI response against business rules"""
# Check for forbidden patterns
for pattern in self.forbidden_patterns:
if re.search(pattern, response, re.IGNORECASE):
return None # Reject response
# Verify factual claims
if "balance" in response.lower():
claimed_balance = self.extract_balance(response)
actual_balance = user_context.get("balance")
if claimed_balance and abs(claimed_balance - actual_balance) > 0.01:
return None # Hallucination detected
return response
结果: 生产环境中未再出现幻觉式的金融数据。
大规模管理对话上下文
from collections import deque
from dataclasses import dataclass
from typing import List
@dataclass
class Message:
role: str
content: str
tokens: int
importance: float # 0‑1 score
class SmartContextManager:
def __init__(self, max_tokens: int = 4000):
self.max_tokens = max_tokens
self.messages = deque()
def add_message(self, message: Message):
self.messages.append(message)
self._trim_context()
def _trim_context(self):
"""Keep most important messages within token limit"""
total_tokens = sum(m.tokens for m in self.messages)
if total_tokens > self.max_tokens and sorted_msgs:
removed = sorted_msgs.pop(0)
self.messages.remove(removed)
total_tokens -= removed.tokens
这项裁剪大约为每月 $1,200 的 API 成本省下来了。
真正重要的指标
pie
title What Breaks AI Agents in Production
"Rate Limit Abuse": 35
"LLM Timeouts": 25
"Hallucinations": 20
"Network Issues": 15
"Database Locks": 5
监控栈
from dataclasses import dataclass
from datetime import datetime
import logging
from typing import Optional
@dataclass
class AgentMetrics:
timestamp: datetime
response_time_ms: float
tokens_used: int
cost_usd: float
user_satisfaction: float
error_type: Optional[str]
def log(self):
logging.info(
"agent_response",
extra={
"duration_ms": self.response_time_ms,
"tokens": self.tokens_used,
"cost": self.cost_usd,
"satisfaction": self.user_satisfaction,
"error": self.error_type,
},
)
class AgentMonitor:
def __init__(self):
self.metrics = []
self.alerts = {
"high_latency": 2000, # ms
"low_satisfaction": 0.6, # 0‑1
"error_rate": 0.05, # 5%
}
async def track_request(self, request_fn):
start = datetime.now()
error = None
try:
result = await request_fn()
satisfaction = self.calculate_satisfaction(result)
except Exception as e:
error = str(e)
raise
finally:
duration = (datetime.now() - start).total_seconds() * 1000
metric = AgentMetrics(
timestamp=datetime.now(),
response_time_ms=duration,
tokens_used=getattr(result, "tokens", 0),
cost_usd=self.calculate_cost(result),
user_satisfaction=satisfaction if error is None else 0,
error_type=error,
)
metric.log()
self.check_alerts(metric)
处理供应商宕机
错误做法
# Hope and pray
response = openai.ChatCompletion.create(...)
带回退的生产做法
from typing import List, Callable
import asyncio
import logging
class AIAgentWithFallbacks:
def __init__(self):
self.providers = [
self.primary_ai, # OpenAI GPT‑4
self.secondary_ai, # Anthropic Claude
self.rule_based, # Template responses
self.human_handoff, # Last resort
]
async def get_response(self, message: str, max_retries: int = 3) -> str:
"""Try providers in order until success"""
for provider in self.providers:
for attempt in range(max_retries):
try:
response = await provider(message)
if self.is_valid_response(response):
return response
except Exception as e:
logging.warning(f"{provider.__name__} failed: {e}")
await asyncio.sleep(2 ** attempt) # exponential backoff
continue
# All providers failed
return (
"I apologize, but I'm having technical difficulties. "
"A human agent will assist you shortly."
)
生产数据
- 主供应商可用率:99.2 %
- 回退触发次数:124 次/月
- 用户对宕机的投诉:0
实际可行的架构
(内容未完,后续继续…)