我构建了处理每月5万条消息的生产级 AI 代理——教程不会告诉你的事

发布: (2025年12月16日 GMT+8 05:54)
5 min read
原文: Dev.to

Source: Dev.to

介绍

三个月前,我把一个 AI 代理部署到生产环境。如今它每月处理 50,000+ 条消息,且零宕机。我的学习教程只教了一个 “hello‑world” 聊天机器人;它们从未让我准备好面对真实世界的问题,例如用户在凌晨 3 点刷 API,或大模型捏造客户数据。下面就是实际发生的故事。

教程里展示的内容

# The "perfect" AI agent
agent = AIAgent(model="gpt-4")
response = agent.chat("Hello!")
print(response)  # Magic! ✨

生产环境的实际情况

graph TB
    A[User Message] --> B{Rate Limiter}
    B -->|Allowed| C[Queue System]
    B -->|Blocked| D[429 Response]
    C --> E{Health Check}
    E -->|Healthy| F[AI Agent]
    E -->|Degraded| G[Fallback Handler]
    F --> H{Response Validator}
    H -->|Valid| I[User]
    H -->|Hallucination| J[Retry Logic]
    G --> I
    J --> F

生产中的 AI 代理需要多层防护,而这些在教程里从未提及。

教程(YOLO)式的做法

while True:
    message = get_message()
    response = ai_agent.process(message)

生产环境的做法

from collections import defaultdict
from datetime import datetime, timedelta

class AdaptiveRateLimiter:
    def __init__(self, base_limit=100):
        self.limits = defaultdict(lambda: {"count": 0, "reset": datetime.now()})
        self.base_limit = base_limit

    def check_limit(self, user_id: str, risk_score: float) -> bool:
        """Adaptive rate limiting based on user behavior"""
        limit_data = self.limits[user_id]

        # Reset window
        if datetime.now() > limit_data["reset"]:
            limit_data["count"] = 0
            limit_data["reset"] = datetime.now() + timedelta(hours=1)

        # Adjust limit based on risk
        adjusted_limit = int(self.base_limit * (1 - risk_score))

        if limit_data["count"] >= adjusted_limit:
            return False

        limit_data["count"] += 1
        return True

为什么重要: 第一个月我拦截了 2,847 次滥用尝试,节省了超过 $500 的无效 API 调用费用。

捏造(幻觉)问题

一位用户询问账户余额,AI 回答:

“Your balance is $127,549.32

实际余额只有 $47.15

解决方案

import re
from typing import Optional

class ResponseValidator:
    def __init__(self):
        # Patterns that should NEVER appear in responses
        self.forbidden_patterns = [
            r'\$[\d,]+\.\d{2}',                     # Dollar amounts
            r'\b\d{3}-\d{2}-\d{4}\b',               # SSN
            r'\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b',  # Emails
        ]

    def validate(self, response: str, user_context: dict) -> Optional[str]:
        """Validate AI response against business rules"""

        # Check for forbidden patterns
        for pattern in self.forbidden_patterns:
            if re.search(pattern, response, re.IGNORECASE):
                return None  # Reject response

        # Verify factual claims
        if "balance" in response.lower():
            claimed_balance = self.extract_balance(response)
            actual_balance = user_context.get("balance")

            if claimed_balance and abs(claimed_balance - actual_balance) > 0.01:
                return None  # Hallucination detected

        return response

结果: 生产环境中未再出现幻觉式的金融数据。

大规模管理对话上下文

from collections import deque
from dataclasses import dataclass
from typing import List

@dataclass
class Message:
    role: str
    content: str
    tokens: int
    importance: float  # 0‑1 score

class SmartContextManager:
    def __init__(self, max_tokens: int = 4000):
        self.max_tokens = max_tokens
        self.messages = deque()

    def add_message(self, message: Message):
        self.messages.append(message)
        self._trim_context()

    def _trim_context(self):
        """Keep most important messages within token limit"""
        total_tokens = sum(m.tokens for m in self.messages)

        if total_tokens > self.max_tokens and sorted_msgs:
            removed = sorted_msgs.pop(0)
            self.messages.remove(removed)
            total_tokens -= removed.tokens

这项裁剪大约为每月 $1,200 的 API 成本省下来了。

真正重要的指标

pie
    title What Breaks AI Agents in Production
    "Rate Limit Abuse": 35
    "LLM Timeouts": 25
    "Hallucinations": 20
    "Network Issues": 15
    "Database Locks": 5

监控栈

from dataclasses import dataclass
from datetime import datetime
import logging
from typing import Optional

@dataclass
class AgentMetrics:
    timestamp: datetime
    response_time_ms: float
    tokens_used: int
    cost_usd: float
    user_satisfaction: float
    error_type: Optional[str]

    def log(self):
        logging.info(
            "agent_response",
            extra={
                "duration_ms": self.response_time_ms,
                "tokens": self.tokens_used,
                "cost": self.cost_usd,
                "satisfaction": self.user_satisfaction,
                "error": self.error_type,
            },
        )

class AgentMonitor:
    def __init__(self):
        self.metrics = []
        self.alerts = {
            "high_latency": 2000,      # ms
            "low_satisfaction": 0.6,   # 0‑1
            "error_rate": 0.05,        # 5%
        }

    async def track_request(self, request_fn):
        start = datetime.now()
        error = None

        try:
            result = await request_fn()
            satisfaction = self.calculate_satisfaction(result)
        except Exception as e:
            error = str(e)
            raise
        finally:
            duration = (datetime.now() - start).total_seconds() * 1000

            metric = AgentMetrics(
                timestamp=datetime.now(),
                response_time_ms=duration,
                tokens_used=getattr(result, "tokens", 0),
                cost_usd=self.calculate_cost(result),
                user_satisfaction=satisfaction if error is None else 0,
                error_type=error,
            )

            metric.log()
            self.check_alerts(metric)

处理供应商宕机

错误做法

# Hope and pray
response = openai.ChatCompletion.create(...)

带回退的生产做法

from typing import List, Callable
import asyncio
import logging

class AIAgentWithFallbacks:
    def __init__(self):
        self.providers = [
            self.primary_ai,      # OpenAI GPT‑4
            self.secondary_ai,   # Anthropic Claude
            self.rule_based,      # Template responses
            self.human_handoff,   # Last resort
        ]

    async def get_response(self, message: str, max_retries: int = 3) -> str:
        """Try providers in order until success"""
        for provider in self.providers:
            for attempt in range(max_retries):
                try:
                    response = await provider(message)
                    if self.is_valid_response(response):
                        return response
                except Exception as e:
                    logging.warning(f"{provider.__name__} failed: {e}")
                    await asyncio.sleep(2 ** attempt)  # exponential backoff
                    continue
        # All providers failed
        return (
            "I apologize, but I'm having technical difficulties. "
            "A human agent will assist you shortly."
        )

生产数据

  • 主供应商可用率:99.2 %
  • 回退触发次数:124 次/月
  • 用户对宕机的投诉:0

实际可行的架构

(内容未完,后续继续…)

Back to Blog

相关文章

阅读更多 »