扩展 AI 代理:使用 C# 掌握弹性、状态和吞吐量

发布: (2026年2月10日 GMT+8 04:00)
10 分钟阅读
原文: Dev.to

看起来您只提供了来源链接,而没有贴出需要翻译的正文内容。请把您想要翻译的文本(文章正文)粘贴在这里,我就可以帮您把它翻译成简体中文,同时保留原始的格式和代码块。谢谢!

高性能 AI 代理架构

想象一下周五晚上高峰时段的高档餐厅。厨房一片混乱:订单堆积,厨师满头大汗,一只掉落的盘子就意味着整张桌子的订单全部丢失。

现在把这个情景映射到你的 AI 基础设施上。如果你的 GPU 集群 是厨房,而你的 AI 代理 是厨师,当用户请求的 “晚餐高峰” 来临时会怎样?

  • 没有弹性伸缩 → 系统崩溃。
  • 没有状态持久化 → 对话丢失。
  • 没有吞吐量优化 → 高延迟和云费用飙升。

在大规模部署容器化 AI 代理不仅仅是把模型装进 Docker;更是要编排资源的动态协作。本指南将拆解实现 弹性、云原生服务 所需的架构支柱,使用现代 C# 和 Kubernetes 将一个简单的 AI 模型转变为可在生产环境中可靠运行的系统。

架构蓝图

PillarAnalogyGoal
Elastic Scaling经理对波动的需求作出响应。
State Persistence记忆在 pod 崩溃时保持对话持续。
Throughput Optimization装配线通过批处理最大化硬件使用率。

1️⃣ 弹性伸缩(基于意图)

在标准的 Kubernetes 部署中,你会根据 CPURAM 使用率进行伸缩。对于 AI 代理来说,这些指标具有误导性:

  • GPU 在处理大批量任务时可能显示 100 % 利用率,但也可能在等待网络响应时闲置。
  • 真正的瓶颈是 队列深度(有多少请求在等待 GPU)和 推理延迟(首 token 响应时间,TTFT)。

使用 System.Diagnostics.Metrics 进行监控

using System.Diagnostics;
using System.Diagnostics.Metrics;

public class InferenceMetrics
{
    private static readonly Meter _meter = new("AI.Agent.Inference");

    // Latency of generating a response (ms)
    private static readonly Histogram<double> _generationLatency =
        _meter.CreateHistogram<double>("agent.generation.latency.ms", "ms",
            "Time taken to generate a response");

    // Number of requests waiting for inference
    private static readonly ObservableGauge<int> _queueDepth =
        _meter.CreateObservableGauge<int>("agent.queue.depth",
            () => RequestQueue.Count, // callback to read current queue size
            "requests",
            "Number of requests waiting for inference");

    public void RecordLatency(double latencyMs) => _generationLatency.Record(latencyMs);
}

优势: 通过将伸缩触发条件从通用的 CPU 使用率转向领域特定的指标(延迟 / 队列深度),水平 Pod 自动伸缩器(HPA)能够 主动 伸缩,从而保持用户体验。

2️⃣ 状态持久化(短期记忆)

AI 代理在会话期间是有状态的:它们依赖之前的消息、工具输出和记忆。而容器是 短暂的。如果 Pod A 崩溃,其内存中的对话历史会消失。

使用 IDistributedCache 的分布式缓存

using Microsoft.Extensions.Caching.Distributed;
using System.Text.Json;

public interface IAgentStateStore
{
    Task<T?> GetStateAsync<T>(string sessionId, CancellationToken ct);
    Task SetStateAsync<T>(string sessionId, T state, CancellationToken ct);
}

public class RedisAgentStateStore : IAgentStateStore
{
    private readonly IDistributedCache _cache;
    public RedisAgentStateStore(IDistributedCache cache) => _cache = cache;

    public async Task<T?> GetStateAsync<T>(string sessionId, CancellationToken ct)
    {
        byte[]? data = await _cache.GetAsync(sessionId, ct);
        if (data == null) return default;

        // 高性能反序列化(.NET 8 中的源生成)
        return JsonSerializer.Deserialize<T>(data);
    }

    public async Task SetStateAsync<T>(string sessionId, T state, CancellationToken ct)
    {
        byte[] data = JsonSerializer.SerializeToUtf8Bytes(state);
        var options = new DistributedCacheEntryOptions
        {
            SlidingExpiration = TimeSpan.FromMinutes(30) // 驱逐不活跃的会话
        };
        await _cache.SetAsync(sessionId, data, options, ct);
    }
}

优势: Pods 变为 无状态——它们只承载逻辑和模型权重。如果某个 pod 崩溃,下一次请求会从 Redis 中获取会话状态,继续执行而不会丢失信息。

3️⃣ 吞吐量优化(批处理)

逐个处理 AI 请求就像一次只上一道菜——昂贵的 GPU 资源被严重闲置。请求批处理会将多个请求聚合到一次前向传播中。

生产者‑消费者 与 System.Threading.Channels

using System.Threading.Channels;

public class BatchingService
{
    private readonly Channel<InferenceRequest> _channel;
    private readonly TimeSpan _maxBatchWait = TimeSpan.FromMilliseconds(20);
    private readonly int _maxBatchSize = 32;

    public BatchingService()
    {
        var options = new BoundedChannelOptions(_maxBatchSize * 2)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _channel = Channel.CreateBounded<InferenceRequest>(options);
    }

    // Producer: called by the HTTP endpoint
    public async ValueTask EnqueueAsync(InferenceRequest request, CancellationToken ct)
        => await _channel.Writer.WriteAsync(request, ct);

    // Consumer: background worker
    public async Task RunAsync(CancellationToken ct)
    {
        var batch = new List<InferenceRequest>(_maxBatchSize);
        while (!ct.IsCancellationRequested)
        {
            // Wait for at least one item
            if (await _channel.Reader.WaitToReadAsync(ct))
            {
                while (_channel.Reader.TryRead(out var item))
                {
                    batch.Add(item);
                    if (batch.Count >= _maxBatchSize) break;
                }

                // Optional time‑based flush
                var flushTask = Task.Delay(_maxBatchWait, ct);
                var completed = await Task.WhenAny(flushTask,
                    _channel.Reader.WaitToReadAsync(ct).AsTask());

                // Execute batch
                await ProcessBatchAsync(batch, ct);
                batch.Clear();
            }
        }
    }

    private Task ProcessBatchAsync(List<InferenceRequest> batch, CancellationToken ct)
    {
        // TODO: Call the model with the aggregated inputs
        // Record latency metrics, update queue depth, etc.
        return Task.CompletedTask;
    }
}

Win: GPU 处理的是 单个大张量 而不是多个小张量,从而显著提升吞吐量并降低每个请求的成本。

Source:

综合运用

  1. 暴露指标InferenceMetrics)→ HPA 根据延迟/队列深度自动扩缩 Pod。
  2. 持久化会话状态RedisAgentStateStore)→ Pod 保持无状态,实现快速恢复。
  3. 批处理传入请求BatchingService + Channels)→ 最大化 GPU 利用率。

有了这三大支柱,你的 AI 服务就能从容应对“周五夜间高峰”,提供低延迟响应、保留对话上下文,并将云费用控制在合理范围内。 🚀

private readonly Channel<InferenceRequest> _channel;
private readonly ModelRunner _modelRunner;

public BatchingService(ModelRunner modelRunner)
{
    // Bounded channel prevents memory exhaustion (Back‑pressure)
    _channel = Channel.CreateBounded<InferenceRequest>(new BoundedChannelOptions(1000)
    {
        FullMode = BoundedChannelFullMode.Wait
    });
    _modelRunner = modelRunner;
}

public async ValueTask EnqueueAsync(InferenceRequest request)
{
    await _channel.Writer.WriteAsync(request);
}

public async Task ProcessBatchesAsync(CancellationToken stoppingToken)
{
    await foreach (var batch in ReadBatchesAsync(stoppingToken))
    {
        await _modelRunner.ExecuteBatchAsync(batch);
    }
}

private async IAsyncEnumerable<List<InferenceRequest>> ReadBatchesAsync(
    [EnumeratorCancellation] CancellationToken ct)
{
    var batch = new List<InferenceRequest>(capacity: 32);
    var timer = Task.Delay(TimeSpan.FromMilliseconds(10), ct);

    await foreach (var request in _channel.Reader.ReadAllAsync(ct))
    {
        batch.Add(request);

        // Condition 1: Batch is full
        if (batch.Count >= 32)
        {
            yield return batch;
            batch = new List<InferenceRequest>(32);
            timer = Task.Delay(TimeSpan.FromMilliseconds(10), ct);
        }
        // Condition 2: Timeout (latency optimisation)
        else if (batch.Count > 0 && await Task.WhenAny(timer, Task.CompletedTask) == timer)
        {
            yield return batch;
            batch = new List<InferenceRequest>(32);
            timer = Task.Delay(TimeSpan.FromMilliseconds(10), ct);
        }
    }
}

架构收益

  • 吞吐量 – 批处理最大化每个 GPU 周期的工作量,减少所需 Pod 数量,降低成本。
  • 权衡 – 引入了延迟与吞吐量的平衡,可通过 批大小超时时间 参数进行调优。

这三个概念构成了一个统一的自愈系统:

  1. 流量 进入并通过 System.Threading.Channels 入队。
  2. 批处理服务 将请求分组并从 Redis 获取代理状态。
  3. 模型 处理批次;指标 记录延迟。
  4. HPA 控制器 读取自定义指标,若延迟激增则扩容 Pod。
  5. 新 Pod 启动,连接 Redis,加入队列处理。

扩展 AI 代理

超越简单容器化需要:

  • 弹性伸缩 与自定义指标。
  • 状态持久化 通过分布式缓存(例如 Redis)。
  • 吞吐量优化 与请求批处理。

掌握这些可将脆弱的原型转变为稳健的云原生强力引擎。

现代 .NET 实践

  • 利用 System.Threading.Channels 实现具备背压感知的队列。
  • 使用 System.Diagnostics.Metrics 进行符合惯用法、低开销的遥测。

讨论提示

  1. 吞吐量 vs. 延迟 – 根据你的经验,在面向用户的聊天代理中,批处理(提升吞吐量)与实时处理(降低延迟)之间的权衡是否值得,还是应该不惜一切代价优先保证低延迟?
  2. 状态持久化 – 在容器化环境中,你目前是如何处理状态的?是依赖外部数据库,还是已经找到在 Pod 生命周期内有效保持状态的方法?

这里展示的概念和代码直接取自电子书 Cloud‑Native AI & Microservices: Containerizing Agents and Scaling Inference(Leanpub)中详尽的路线图。

0 浏览
Back to Blog

相关文章

阅读更多 »

解锁笔记本电脑 GPU 的隐藏力量

概述:大多数现代笔记本电脑都配备了强大的 GPU,但往往未被充分利用。无论你是运行本地 LLM 的软件工程师,还是数据科学家……