WhatsApp用户最常见的习惯(以及如何应对)
Source: Dev.to
AWS 专家解决方案架构师
Applied AI @ AWS
所表达的观点仅代表我个人,并不代表我的雇主的观点或立场。
概览
了解如何通过将快速连续的消息缓冲为单条连贯的消息来优化 WhatsApp‑to‑Amazon Connect 集成。本分步指南使用以下全部架构组件:
- AWS CDK
- DynamoDB Streams(带滚动窗口)
- AWS Lambda
- AWS End User Messaging Social
- Amazon Connect
当客户通过 WhatsApp 联系时,他们很少只发送一条消息。他们打字很快:
Hello
I need help
with my order
P12345
每条消息都会触发一个单独的 webhook 事件,如果不进行任何优化,每条消息都会在 Amazon Connect 中生成一条独立的聊天记录。结果会是:
- 对话被碎片化
- 坐席感到困惑
- 不必要的成本
在 AI 驱动的聊天中,我们通常会在坐席仍在处理时阻止用户发送额外消息。对于异步、程序化的消息我们无法控制这一点,但我们 可以 控制在回复之前的等待时长。本博客不仅帮助解决 WhatsApp‑to‑Connect 场景,也适用于任何面临相同挑战的聊天渠道(SMS、社交媒体私信等)。
您将学习如何使用 消息缓冲层 将快速连续的 WhatsApp 消息聚合为单条连贯的消息,然后再转发到 Amazon Connect。
Code repository:
Source: …
架构概述
在 AWS End User Messaging Social 与 Amazon Connect 之间添加一个缓冲层,功能如下:
- 将收到的 WhatsApp 消息写入 DynamoDB 表。
- 使用带有滚动窗口的 DynamoDB Streams 对消息进行缓冲。
- 将同一发送者的连续文本消息合并为一条。
- 将合并后的消息作为单条聊天消息转发给 Amazon Connect。
结果: 坐席看到的是整洁、自然的对话,而不是大量碎片化的消息。
数据流
- WhatsApp → AWS End User Messaging Social → 将消息发布到 SNS 主题。
- Lambda
on_raw_messages将每条消息存入 DynamoDB 表raw_messages。 - DynamoDB Streams 触发 Lambda
message_aggregator,使用滚动窗口作为缓冲。 - 聚合器对同一发送者的文本消息进行分组、排序并拼接。
- 将聚合后的消息转发给 WhatsApp 事件处理器,该处理器创建/更新 Amazon Connect 的聊天会话。
问题示例
当用户快速发送多条消息时:
| 消息编号 | 内容 |
|---|---|
| 1 | Hello |
| 2 | I need help |
| 3 | with my order |
| 4 | P12345 |
如果不进行缓冲,每条消息都会成为单独的 Amazon Connect 聊天消息,导致:
- 对话被拆分,坐席难以跟进。
- 成本更高(每条消息单独计费)。
- 多次下游 Lambda 调用。
DynamoDB 表:raw_messages
- 分区键:
from(发送者电话号码) - 排序键:
id(消息 ID) - TTL: 已启用,用于自动清理
- Streams: 已启用,使用滚动窗口
使用 from 作为分区键可确保来自同一用户的消息被存储在一起并落在同一个分片上,从而保证流的顺序处理。
Lambda:存储原始消息
import json, decimal, os, boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.getenv('RAW_MESSAGES_TABLE'))
def lambda_handler(event, context):
records = event.get("Records", [])
for record in records:
sns = record.get("Sns", {})
sns_message = json.loads(sns.get("Message", "{}"), parse_float=decimal.Decimal)
webhook_entry = json.loads(sns_message.get("whatsAppWebhookEntry", "{}"))
for change in webhook_entry.get("changes", []):
value = change.get("value", {})
metadata = value.get("metadata", {})
contacts = value.get("contacts", [])
for message in value.get("messages", []):
item = message.copy()
item["metadata"] = metadata
item["messaging_product"] = value.get("messaging_product")
# Store in DynamoDB
table.put_item(Item=item)
return {'statusCode': 200}
缓冲策略:翻滚窗口
翻滚窗口是缓冲策略的关键。DynamoDB Streams 会触发 message_aggregator Lambda,但并不是对每条记录都调用它,而是窗口会等待可配置的秒数(默认 20 秒),然后一次性使用该窗口中 所有累计的记录 调用函数。
- 每个分片一个 Lambda → 窗口内同一用户的消息会一起处理。
聚合器 Lambda
聚合器按发送者分组消息,按时间戳排序,并将连续的 text 消息用换行符连接。非文本消息(图片、音频、文档等)保持为单独的项目。
输入示例
Message 1: Hello
Message 2: I need help
Message 3: with my order
Message 4: P12345
输出(单个聚合消息)
Hello
I need help
with my order
P12345
核心聚合逻辑
import os, json, boto3
from boto3.dynamodb.types import TypeDeserializer
lambda_client = boto3.client('lambda')
deserializer = TypeDeserializer()
def deserialize_dynamodb(image):
return {k: deserializer.deserialize(v) for k, v in image.items()}
def aggregate_all_messages(records):
# Group by sender
grouped = {}
for rec in records:
sender = rec['from']
grouped.setdefault(sender, []).append(rec)
aggregated = []
for sender, msgs in grouped.items():
# Sort by timestamp (or id) to preserve order
msgs.sort(key=lambda x: x.get('timestamp') or x.get('id'))
# Concatenate consecutive text messages
text_parts = [m['body'] for m in msgs if m.get('type') == 'text']
aggregated_body = "\n".join(text_parts)
# Preserve non‑text items (optional handling)
non_text = [m for m in msgs if m.get('type') != 'text']
agg_msg = {
"from": sender,
"aggregated_body": aggregated_body,
"original_messages": msgs,
"non_text_attachments": non_text
}
aggregated.append(agg_msg)
return aggregated
def lambda_handler(event, context):
raw_records = event.get("Records", [])
records = []
for record in raw_records:
if record.get("eventName") == "INSERT":
new_image = record.get("dynamodb", {}).get("NewImage", {})
deserialized = deserialize_dynamodb(new_image)
records.append(deserialized)
if not records:
return {"state": event.get('state', {})}
aggregated = aggregate_all_messages(records)
# Forward each aggregated message to the WhatsApp event handler
for agg in aggregated:
lambda_client.invoke(
FunctionName=os.environ['WHATSAPP_EVENT_HANDLER'],
InvocationType='Event',
Payload=json.dumps(agg)
)
return {"state": event.get('state', {})}
最后一步:转发至 Amazon Connect
一旦聚合完成,Lambda 异步调用 WhatsApp 事件处理程序,该处理程序会使用合并后的消息创建或更新 Amazon Connect 聊天会话。坐席现在只会看到 一条聊天记录,而不是大量碎片化的消息。
Takeaways
- 缓冲 快速连续的消息可以减少聊天碎片化。
- 滚动窗口 在 DynamoDB Streams 上提供一种简单、无服务器的批处理消息方式。
- 该模式适用于 任何 聊天渠道(WhatsApp、短信、社交媒体私信等)。
欢迎在上方链接的 GitHub 仓库中查看完整实现。祝开发愉快!
Benefits of Message Buffering
- Cleaner conversation flow – 多条快速消息会合并为单条连贯的消息。
- Cost optimisation – 更少的下游消息可降低 Amazon Connect Chat 成本。
- Automatic cleanup – TTL 会自动删除旧的原始消息。
- Scalable – DynamoDB Streams 能处理高吞吐量(每个流最高 10 000 条记录)。
- Reliable – 流处理保证至少一次投递(不会丢失消息)。
- Example scenario – 1 000 条原始消息聚合为 250 条消息(4 : 1 比例)。消息由人工坐席回复。
Cost Comparison
| Component | Without Buffering | With Buffering | Savings |
|---|---|---|---|
| DynamoDB + Streams | – | ≈ $0.0013 | – |
| Lambda (all functions) | – | ≈ $0.00078 | – |
| Buffering Infrastructure | $0.00 | ≈ $0.002 | – |
| Inbound API Calls | 1 000 calls | 250 calls | 75 % fewer calls |
| Connect Chat (In) Cost | $4.00 | $1.00 | $3.00 |
| Total | $4.00 | ≈ $1.00 | ≈ $3.00 (75 % reduction) |
Note: 总成本包括 Connect 的入站/出站费用以及终端用户消息(EUM)的入站/出站费用。在本示例中,我们仅减少了 Amazon Connect Chat 的入站消息。
- Connect Chat cost: $0.004 × msg (in) + $0.004 × msg (out) – 详见官方定价页面。
- EUM cost: $0.005 × msg (in) + $0.005 × msg (out) – 详见官方定价页面。
前置条件
-
WhatsApp Business Account (WABA) – 创建一个新的账号或将现有账号迁移到 AWS。
- 拥有或创建一个 Meta Business Account。
- 打开 AWS End User Messaging (EUM) Social 控制台,通过嵌入的 Facebook 门户将您的业务账号关联。
- 获取能够接收 SMS/语音验证的电话号码,并将其添加到 WhatsApp。
- ⚠️ 请勿使用个人 WhatsApp 号码。
-
Amazon Connect 实例 – 如果没有实例,请参照 Amazon Connect 设置指南 操作。
-
实例标识符 – 在 Amazon Connect 控制台中定位以下 ARN:
arn:aws:connect:::instance/INSTANCE_ID arn:aws:connect:::instance/INSTANCE_ID/contact-flow/CONTACT_FLOW_ID -
联系流 – 创建(或复用)一个入站联系流,定义用户体验并发布。
-
区域对齐 – 将所有资源部署在与您的 AWS End User Messaging WhatsApp 号码配置相同的区域。
部署步骤
# Clone the sample repository
git clone https://github.com/aws-samples/sample-whatsapp-end-user-messaging-connect-chat.git
cd sample-whatsapp-end-user-messaging-connect-chat/whatsapp-eum-connect-chat
# Follow the CDK Deployment Guide (README.md) to deploy the stack
部署完成后,请使用您的 Amazon Connect 详细信息更新 SSM 参数 /whatsapp_eum_connect_chat/config:
{
"instance_id": "",
"contact_flow_id": "",
"chat_duration_minutes": 60,
"ignore_reactions": "yes",
"ignore_stickers": "yes"
}
参数参考
| 参数 | 描述 |
|---|---|
instance_id | 您的 Amazon Connect 实例 ID |
contact_flow_id | 聊天的入站联系流 ID |
chat_duration_minutes | 聊天会话保持活跃的时长(默认 = 60) |
ignore_reactions | 是否忽略 WhatsApp 表情回应(默认 = yes) |
ignore_stickers | 是否忽略 WhatsApp 贴纸(默认 = yes) |
将堆栈连接到 AWS 终端用户消息
-
检索 SNS 主题 ARN
- 打开 AWS Systems Manager → Parameter Store。
- 复制
/whatsapp_eum_connect_chat/topic/in的值(以arn:aws:sns开头)。
-
在 EUM Social 控制台中配置目标
- 选择 Destination → Amazon SNS。
- 粘贴复制的主题 ARN。
可选调整
-
缓冲窗口 – 默认是 20 秒。要更改它,请编辑
config.py中的BUFFER_IN_SECONDS并重新部署:BUFFER_IN_SECONDS = 20 # 根据需要修改(秒) -
测试 – 打开 Amazon Connect 联系控制面板 (CCP),向 EUM 号码发送 WhatsApp 消息,观察快速发送的消息被聚合为 Connect 中的一条消息。
-
进一步增强
- 根据使用场景调整缓冲窗口(实时需求使用更短,成本节约使用更长)。
- 为处理失败的流添加 死信队列 (Dead‑Letter Queue)。
- 实现自定义聚合逻辑(例如,将图片一起分组)。
- 与 代理发起的 WhatsApp 解决方案结合,实现完整的双向通信。
有用链接
- 项目仓库 –
- Amazon Connect 管理员指南 –
- Amazon Connect API 参考 –
- AWS 终端用户消息社交用户指南 –
- DynamoDB Streams 开发者指南 –
- 使用 Amazon DynamoDB 和 AWS Lambda 构建可扩展的事件驱动架构 –
AWS 专家解决方案架构师 – Applied AI @ AWS
此处表达的观点仅代表本人个人,不代表雇主的立场。