Agentic AI:Schema 验证的工具执行和确定性缓存
Source: Dev.to
请提供您希望翻译的具体文本内容,我将把它翻译成简体中文并保持原有的格式、Markdown 语法以及技术术语不变。谢谢!
概览
Agentic AI 系统之所以会出错,并不是因为模型无法推理,而是因为工具执行缺乏管理。
一旦允许代理进行规划、重试、自我批评或协作,工具调用就会迅速激增。若没有严格的控制,这会导致基础设施故障、成本不可预测增长以及行为的非确定性。
本文阐述了如何使用两种明确且相互独立的机制来构建 Agentic AI 系统的工具执行层:
- 基于合约的工具执行
- 确定性的工具结果缓存
每种机制解决一类不同的生产故障,且必须分别实现。
实际生产场景
背景
您正在为 SRE 团队构建一个 Incident Analysis Agent(事件分析代理)。
代理的功能
- 获取服务日志
- 分析错误模式
- 如果置信度低则重新获取日志
- 允许第二个代理(批评者)验证发现
工具特性
工具名称: fetch_service_logs
后端: Elasticsearch / Loki / Splunk
延迟: 300–800 ms
- 限流
- 每次执行成本高
这是一种常见的真实工作负载。
Source: …
第Ⅰ部分:基于合约的工具执行在智能体 AI 系统中的应用
没有合约时的问题
当大语言模型(LLM)直接输出工具参数时,运行时会收到如下输入:
{"service": "auth", "window": "24 hours"}
{"service": "Auth Service", "window": "yesterday"}
{"service": ["auth"], "window": 24}
{"service": "", "window": "24h"}
为什么会出现这种情况
- LLM 以自然语言进行推理
- LLM 会对参数进行同义改写
- LLM 不是类型安全的系统
生产环境中会出现的故障
- 无效的 Elasticsearch 查询
- 全索引扫描
- 查询构造器崩溃
- 静默的数据损坏
- 重试循环放大故障
依赖模型始终产生有效输入并不是系统设计的思路。
基于合约的工具执行意味着什么
基于合约的执行意味着:
- 运行时 拥有 工具接口
- 模型 必须遵循 该接口
- 无效输入 永远不会到达 基础设施
这与生产 API 中的边界强制完全相同。
步骤 1:定义严格的工具合约
from pydantic import BaseModel, Field, field_validator
import re
from typing import List
class FetchServiceLogsInput(BaseModel):
service: str = Field(
...,
description="Kubernetes 服务名称,全部小写,不含空格"
)
window: str = Field(
...,
description="时间窗口格式:5m、1h、24h"
)
@field_validator("service")
@classmethod
def validate_service(cls, value: str) -> str:
if not value:
raise ValueError("service cannot be empty")
if not re.fullmatch(r"[a-z0-9\-]+", value):
raise ValueError("service must be lowercase alphanumeric with dashes")
return value
@field_validator("window")
@classmethod
def validate_window(cls, value: str) -> str:
if not re.fullmatch(r"\d+(m|h)", value):
raise ValueError("window must be like 5m, 1h, 24h")
return value
class FetchServiceLogsOutput(BaseModel):
logs: List[str]
这些校验可以防止的情况
| 无效输入 | 防止的问题 |
|---|---|
| service 为空 | 全日志扫描 |
| 大小写混合或包含空格 | 查询不匹配 |
| 自然语言时间表达 | 歧义查询 |
| 列表或数字形式 | 查询构造器崩溃 |
除非通过此关卡,否则任何内容都不会进入基础设施。
步骤 2:实现实际工具
def fetch_service_logs(service: str, window: str) -> list[str]:
print(f"QUERY logs for service={service}, window={window}")
return [
f"[ERROR] timeout detected in {service}",
f"[WARN] retry triggered in {service}",
]
步骤 3:运行时拥有的工具注册表
TOOLS = {
"fetch_service_logs": {
"version": "v1",
"input_model": FetchServiceLogsInput,
"output_model": FetchServiceLogsOutput,
"handler": fetch_service_logs,
"cache_ttl": 3600, # seconds
}
}
智能体不能自行创建工具、绕过模式或更改版本。
步骤 4:基于合约的执行边界
def execute_tool_contract(tool_name: str, raw_args: dict):
tool = TOOLS[tool_name]
# 根据合约验证输入
args = tool["input_model"](**raw_args)
# 使用干净的 dict 调用处理函数
raw_result = tool["handler"](**args.model_dump())
# 将结果包装进输出模型
return tool["output_model"](logs=raw_result)
合约强制执行的执行流程
Agent emits tool call
↓
Raw arguments (untrusted)
↓
Schema validation
┌───────────────┐
│ Invalid │ → reject and re‑plan
└───────────────┘
↓
Valid
↓
Tool executes
↓
Infrastructure queried safely
第 II 部分:Agentic AI 系统中的确定性缓存
添加合约后的问题
即使进行完美的验证,代理仍会重复工作:
execute_tool_contract(
"fetch_service_logs",
{"service": "auth-service", "window": "24h"}
)
execute_tool_contract(
"fetch_service_logs",
{"window": "24h", "service": "auth-service"}
)
相同的意图、相同的后端,却被执行了两次。
为什么朴素的缓存会失效
{"service": "auth-service", "window": "24h"}
{"window": "24h", "service": "auth-service"}
不同的字符串 → 不同的缓存键,即使它们在语义上是相同的。
Agentic 系统需要语义等价,而不是原始字符串相等。
实现确定性缓存所需的基础设施
- 规范化(Canonicalisation) – 将传入的参数转换为确定的、已排序的表示(例如,排序后的 JSON)。
- 基于哈希的缓存键 – 对规范化后的负载以及工具版本计算稳定的哈希(SHA‑256)。
- 结果存储 – 持久化输出模型(或其序列化形式),并记录哈希和 TTL。
- 缓存查找包装器 – 在调用处理器之前检查缓存;命中时返回已存结果,未命中时执行并存储。
一个最小实现示例:
import json, hashlib, time
from collections import defaultdict
# Simple in‑memory cache for illustration
_CACHE = defaultdict(dict) # {tool_name: {hash: (timestamp, result)}}
def _canonicalise(args: dict) -> str:
"""Return a deterministic JSON string with sorted keys."""
return json.dumps(args, sort_keys=True, separators=(",", ":"))
def _hash_payload(tool_name: str, payload: str) -> str:
return hashlib.sha256(f"{tool_name}:{payload}".encode()).hexdigest()
def execute_with_cache(tool_name: str, raw_args: dict):
tool = TOOLS[tool_name]
# 1️⃣ Validate input
args = tool["input_model"](**raw_args)
# 2️⃣ Canonicalise & hash
payload = _canonicalise(args.model_dump())
key = _hash_payload(tool_name, payload)
# 3️⃣ Cache lookup
entry = _CACHE[tool_name].get(key)
if entry:
ts, cached_result = entry
# (Cache hit logic would go here)
return cached_result
# 4️⃣ Execute and store
raw_result = tool["handler"](**args.model_dump())
validated = tool["output_model"](logs=raw_result)
_CACHE[tool_name][key] = (time.time(), validated)
return validated
示例规范化形式
fetch_service_logs|auth-service|24h|v1
步骤 2:缓存设置(Redis 示例)
import redis
import hashlib
import json
redis_client = redis.Redis(host="localhost", port=6379)
def cache_key(canonical: str) -> str:
return hashlib.sha256(canonical.encode()).hexdigest()
步骤 3:带缓存的工具执行
def execute_tool_cached(tool_name: str, raw_args: dict):
tool = TOOLS[tool_name]
args = tool["input_model"](**raw_args)
canonical = json.dumps(
{
"tool": tool_name,
"version": tool["version"],
"args": args.model_dump(),
},
sort_keys=True,
separators=(",", ":")
)
key = cache_key(canonical)
cached = redis_client.get(key)
if cached:
print("CACHE HIT — skipping infra call")
return tool["output_model"](**json.loads(cached))
print("CACHE MISS — executing tool")
raw_result = tool["handler"](**args.model_dump())
validated = tool["output_model"](logs=raw_result)
redis_client.setex(
key,
tool["cache_ttl"],
validated.model_dump_json()
)
return validated
确定性缓存的执行流程
Validated tool request
↓
Canonicalization
↓
Hash generation
↓
Redis lookup
┌───────────────┐
│ Cache HIT │ → return cached result
└───────────────┘
↓
Cache MISS
↓
Execute expensive tool
↓
Validate output
↓
Sto
### 职责分离
| 问题 | 解决方案 |
|------------------------|--------------------------|
| 无效输入 | 合约驱动执行 |
| 基础设施崩溃 | 合约驱动执行 |
| 重复执行 | 确定性缓存 |
| 成本爆炸 | 确定性缓存 |
re result with TTL ↓ Return result
## 关键要点
具备自主性的 AI 系统只有在将工具执行像后端基础设施一样进行工程化,而不是把它当作大型语言模型的副作用时,才能达到生产就绪的水平。
- **合约确保执行安全。**
- **缓存使执行具备可扩展性。**
跳过任意一项都必然导致失败。