如何切换到 SQS 批量操作提升性能和计费

发布: (2025年12月24日 GMT+8 03:36)
6 分钟阅读
原文: Dev.to

Source: Dev.to

请提供您希望翻译的文章正文内容,我将为您翻译成简体中文。

摘要

在本文中,我们探讨了将 SQS 消息处理从单个 SendMessage 调用重构为 Batch SendMessage 批量操作,如何显著提升应用性能并通过降低 IOPS 使用量来减少 SQS 计费成本。

思路

在使用 DataDog 监控 Golang 应用时,我们可以详细测量 SQS 消息的发送情况。通过比较传统的循环发送方式与批量发送方式,我们可以清晰地看到在时延、网络调用和资源使用方面的差异。

完整的 Datadog SQS 跟踪并非所有语言都支持

在此服务上设置以下环境变量以启用完整的负载标记:

DD_TRACE_CLOUD_REQUEST_PAYLOAD_TAGGING=all
DD_TRACE_CLOUD_RESPONSE_PAYLOAD_TAGGING=all

文档:

对于 Golang,您可以利用 Datadog 属性标签来检查负载元数据。

常规 SQS 消息发送操作

逐条发送消息会涉及多次网络调用并产生额外开销。下面的追踪图展示了使用循环操作时的时间情况。

Regular send timing diagram

示例: 单独发送 7 条消息 共耗时 175 ms,其中包含 7 次独立的 HTTP 请求。首次调用通常因 DNS 查询和连接建立而主导整体耗时。

由于服务运行在同一 K8s 集群中,我们可以认为实验环境干净,没有额外的开销。

批量发送消息

AWS SQS 允许每批最多发送 10 条消息。在 2 批 中发送 20 条消息 展示了显著的效率提升:

  • 发送的消息数量增加了 3 倍。
  • HTTP 请求减少了 10 倍。
  • 总处理时间约减少了 ≈ 3 倍。

批量发送时序图

When a batch send is performed, the response contains a status for each message, including any errors. The batch can be partially successful; parsing the response allows you to efficiently replay or handle failures.

当执行批量发送时,响应会包含每条消息的状态,包括任何错误。批次可能部分成功;解析响应可以让您高效地重放或处理失败。

{
  "Successful": [
    { "ID": "0", "MessageID": "655f3404-fbe4-4c51-8868-b5c604bd5f6d", "Error": null },
    { "ID": "1", "MessageID": "daf36653-9abb-490b-b620-608efa24a219", "Error": null },
    { "ID": "2", "MessageID": "93f4dcfd-0500-4076-90f2-3b880b32c943", "Error": null },
    { "ID": "3", "MessageID": "f6c7b079-98f5-4290-b293-2ac6e43ed6f2", "Error": null },
    { "ID": "4", "MessageID": "2b4a96bc-b4ec-4711-9473-d887dd3213f7", "Error": null },
    { "ID": "5", "MessageID": "1bd30cd9-f9c1-4b47-8d6d-2e23ce771841", "Error": null },
    { "ID": "6", "MessageID": "8eed75ef-2563-442e-a191-6b3dff29d635", "Error": null },
    { "ID": "7", "MessageID": "c65a36ce-7ce0-444c-9974-96648dcae0ea", "Error": null },
    { "ID": "8", "MessageID": "75379265-5f9-4a60-8c3a-0537cffdaa80", "Error": null },
    { "ID": "9", "MessageID": "59239903-d4d9-498f-9a08-6d7d7ae8beba", "Error": null },
    { "ID": "10", "MessageID": "9a614c58-113b-487d-a8f1-7509f93b42f9", "Error": null },
    { "ID": "11", "MessageID": "1077de5c-8f0f-4d5b-a0fe-dca45712bfdf", "Error": null },
    { "ID": "12", "MessageID": "8b0f5836-0e01-4a88-9793-4bac2a6d879a", "Error": null }
  ],
  "Failed": []
}

AWS Console behavior

Batch sending does not change how messages appear in SQS. Each message is stored individually, so consumers don’t need any changes to handle batches.

SQS console view

相同的消息,具有相同的结构,已发布并在 SQS 中可见。可以采用其他优化技术在从 SQS 拉取消息时调整消费者的批量大小。

Golang 实现示例

entry := &sqs.SendMessageBatchRequestEntry{
    Id:          aws.String(fmt.Sprintf("%d", i+idx)), // Unique ID within batch
    MessageBody: aws.String(string(b)),
}

if taskConfig.MessageGroupId != "" {
    entry.SetMessageGroupId(taskConfig.MessageGroupId)
}
if taskConfig.MessageDeduplicationId != "" {
    entry.SetMessageDeduplicationId(taskConfig.MessageDeduplicationId)
}
if taskConfig.DelaySeconds > 0 {
    entry.SetDelaySeconds(taskConfig.DelaySeconds)
}
entries = append(entries, entry)
}

// ...

type BatchResult struct {
    Successful []BatchResultEntry
    Failed     []BatchResultEntry
}

代码示例

// BatchResultEntry represents a single entry in a batch result
type BatchResultEntry struct {
    ID        string
    MessageID string
    Error     error
}

// Send batch
input := &sqs.SendMessageBatchInput{
    QueueUrl: stp.url,
    Entries:  entries,
}

output, err := stp.c.SendMessageBatchWithContext(ctx, input)
if err != nil {
    err = handleSqsErrors(err)
    // Mark all entries in this batch as failed
    for idx := range batch {
        result.Failed = append(result.Failed, BatchResultEntry{
            ID:    fmt.Sprintf("%d", i+idx),
            Error: err,
        })
    }
    continue
}

专用消息详情

Message details screenshot

正是这个 messageID 在批量成功响应部分返回的。

需要检查和优化的其他事项

去重技术

在发送消息之前,执行去重。这可以降低 SQS 的 IOPS 使用量,减少处理延迟,减轻消费者端的负载。同时还能避免不必要的存储读取、重写等操作。

分布式追踪框架可能会占用 SQS 批处理槽位

一些分布式追踪框架会通过 SQS 等异步传输渠道传播元信息。如果您使用了这些框架,请检查相应的集成——它们可能会影响最大批处理大小。

  • Datadog 使用一个批处理元素来传播带有追踪的元信息,该信息将被消费并应用于同一追踪。
  • AWS X‑Ray(一种专有的 AWS 技术)在 SQS 批处理中占用任何槽位;它通过 UDP 服务器发送 span/trace 信息。

限制

  • 消息大小负载:1 MiB
  • 批量大小:10 条消息
  • 负载序列化:JSON

结论

  • 显著降低整体处理时间。
  • 减少网络往返次数。
  • 降低 SQS 费用(API 调用次数更少,约减少 10 倍)。

链接

Back to Blog

相关文章

阅读更多 »