使用 API Gateway WebSocket API 更新数据状态

发布: (2025年12月4日 GMT+8 18:58)
7 min read
原文: Dev.to

Source: Dev.to

TL;DR

Amazon API Gateway 的 WebSocket API 可以让前端实时获取最新数据,提供一种接近实时的替代方案,避免轮询 REST API 端点来获取状态通知。

场景描述

假设我们运行一个点击数据处理应用。用户在页面上点击链接和按钮,应用会记录点击的数量和类型。一个经过授权的基于浏览器的仪表盘展示当前的点击计数。前端只负责展示数据——不包含业务逻辑。

点击结果会持久化到数据库,并由后端供客户端读取。问题是如何在接近实时的情况下显示当前的点击计数状态。

需求

  • 仪表盘应实时或近实时显示点击情况。
  • 最小化人工干预,解决方案应尽可能自动化。
  • 点击结果必须持久化到数据库。
  • 解决方案需要具备可扩展性。
  • 架构必须可扩展,其组件能够在其他业务场景中复用。
  • 基础设施必须托管在 AWS 上,并采用无服务器(serverless)方式。

选项

方法优点缺点
轮询实现简单;可直接使用已有的 REST API。在大规模时成本高;增加延迟;需要管理轮询器(间隔、清理)。
WebSockets基于推送,近实时更新;降低网络流量;无需轮询逻辑。需要设置 WebSocket API 并管理连接 ID。

测试表明轮询虽然可行,但成本较高,于是转向了 WebSocket 方案。

架构概览

为简洁起见,图示已省略。

该架构包括:

  1. 数据摄取/ingest REST 端点(API Gateway) → IngestClickData Lambda → DynamoDB 表。
  2. WebSocket API$connect$disconnect$default 路由。
  3. 连接 ID 持久化PersistConnectionIds Lambda 将连接 ID 存入 DynamoDB。
  4. 流处理 – DynamoDB Streams 触发 StreamProcessor Lambda,向所有已连接的客户端广播点击更新。
  5. 仪表盘客户端 – 连接到 WebSocket API 并接收实时点击状态消息。

前置条件

  • 拥有可以创建 API Gateway(REST 与 WebSocket)、Lambda、DynamoDB 以及 DynamoDB Streams 的 AWS 账户权限。
  • 熟悉基础设施即代码工具(如 AWS CDK)会有帮助,但不是必须的。

实现细节

数据摄取

当用户点击元素时,前端将点击数据发送到 /ingest REST 端点。IngestClickData Lambda 对负载进行校验(可选的转换),并使用 PutItem 操作写入 DynamoDB。

**注意:**如果不需要校验/转换,API Gateway 可以直接写入 DynamoDB,省去 Lambda,节省数十毫秒的延迟。

仪表盘连接(WebSocket 客户端)

// connectWebSocket.js
function connectWebSocket() {
  // 1. 创建 WebSocket 对象
  const ws = new WebSocket(WEBSOCKET_URL);

  // 2. 连接打开
  ws.onopen = () => {
    updateConnectionStatus(ConnectionStatus.CONNECTED);
  };

  // 3. 收到消息
  ws.onmessage = handleMessage;

  // 4. 连接关闭
  ws.onclose = (event) => {
    updateConnectionStatus(ConnectionStatus.DISCONNECTED);
    // 此处可加入可选的重连逻辑
  };

  // 5. 发生错误
  ws.onerror = (error) => {
    updateConnectionStatus(ConnectionStatus.DISCONNECTED);
    // onerror 总是随后会触发 onclose
  };
}

WEBSOCKET_URL 的格式与 REST API 相同,例如:

wss://.execute-api.eu-central-1.amazonaws.com//

使用 IaC(如 CDK)时,可在构建时将 URL 注入到客户端代码包中。

WebSocket 路由

  • $connect – 触发 PersistConnectionIds Lambda,将生成的连接 ID 存入 DynamoDB。
  • $disconnect – 触发 DeleteConnectionIds Lambda,在仪表盘客户端离开时删除该连接 ID。
  • $default – 本示例未使用;作为兜底路由。

DynamoDB Streams 与 StreamProcessor

IngestClickData 将新的点击记录写入 DynamoDB,产生 Streams 事件。StreamProcessor Lambda 消费这些事件,过滤出点击数据项,并通过 WebSocket API 将更新的状态广播给所有已存储的连接 ID。

如果点击数据和连接 ID 共用同一张表,可通过事件过滤减少不必要的调用:

// CDK 示例 – 过滤 entityType = 'CLICK' 的项目
streamProcessorLambda.addEventSource(
  new lambdaEventSources.DynamoEventSource(clickstreamTable, {
    // ... 其他属性
    filters: [
      lambda.FilterCriteria.filter({
        dynamodb: {
          NewImage: {
            entityType: {
              S: lambda.FilterRule.isEqual('CLICK')
            }
          }
        }
      })
    ]
  })
);

广播点击数据

StreamProcessor 从 DynamoDB 中检索所有活跃的连接 ID,并使用 API Gateway Management API 向每个客户端发送消息:

// 伪代码
for (const connId of connectionIds) {
  await apigatewaymanagementapi.postToConnection({
    ConnectionId: connId,
    Data: JSON.stringify(clickStatus)
  }).promise();
}

接收点击结果(客户端)

function handleMessage(event) {
  const data = JSON.parse(event.data);
  // 使用新的点击计数更新 UI
  updateClickCount(data.count);
}

讨论要点

是否真的需要 REST API?

如果摄取路径不需要校验,可以直接让 API Gateway 写入 DynamoDB,进一步降低延迟和成本。

异步工作流

该架构天然支持异步处理:点击事件被存储、流式传输并广播,整个过程不阻塞客户端。

灵活性

  • 分离表 – 点击数据和连接 ID 可以存放在不同的表中,以实现关注点分离。
  • 额外消费者 – 其他服务(如分析、告警)可以订阅同一 DynamoDB Stream。
  • 可扩展性 – 可自动调节 Lambda 并发和 DynamoDB 预置容量。

总结

通过结合 REST 摄取端点、DynamoDB Streams 与 API Gateway WebSocket API,我们实现了近实时的仪表盘更新,省去了客户端轮询的开销。该方案完全无服务器、可扩展,并且能够在不同业务场景中复用。

Back to Blog

相关文章

阅读更多 »