构建从 Shopify 到 Meta 营销 API 的实时数据管道
Source: Dev.to
我在过去几个月里构建了 Audience+ —— 一个能够实时将 Shopify 客户数据同步到 Meta 广告平台的工具。
下面是一份清晰、易懂的技术拆解,介绍了它的工作原理、我们解决的挑战,以及一些具体的代码模式,或许能帮助你在构建类似项目时提供参考。
我们要解决的问题
Meta 的基于浏览器的跟踪根本失效。
- iOS 14.5 应用跟踪透明度隐藏了约 75 % 的 iPhone 用户。
- Meta 像素仅保留 180 天 的数据。
- 对于大多数商店,Meta 只基于 30–40 % 的真实转化数据 进行优化。
解决方案: 直接从 Shopify 使用服务器端 API 将 第一方客户和购买数据 发送给 Meta。
架构概览
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Shopify Store │──▶│ Audience+ API│──▶│ Meta API │
│ (Webhooks) │ │ (Processing) │ │ │
└───────────────┘ └───────────────┘ └───────────────┘
│
▼
┌───────────────┐
│ PostgreSQL │
│ (Customer DB)│
└───────────────┘
技术栈
| 组件 | 选择 |
|---|---|
| 框架 | Next.js 15(App Router) |
| 语言 | TypeScript |
| API 层 | tRPC |
| 数据库 | PostgreSQL(Neon serverless) |
| ORM | Prisma |
| 认证 | Better‑Auth + Shopify OAuth |
| 托管 | Vercel |
Shopify Webhook 集成
Shopify 会为客户和订单生命周期事件发送 webhook。
我们在处理之前使用 HMAC 签名验证每个请求,以确保其真实性。
// app/api/webhooks/shopify/route.ts
import { NextRequest, NextResponse } from 'next/server';
import crypto from 'crypto';
export async function POST(req: NextRequest) {
const body = await req.text();
const hmac = req.headers.get('x-shopify-hmac-sha256');
if (!verifyShopifyWebhook(body, hmac)) {
return NextResponse.json({ error: 'Invalid signature' }, { status: 401 });
}
const topic = req.headers.get('x-shopify-topic');
const payload = JSON.parse(body);
switch (topic) {
case 'orders/create':
await handleNewOrder(payload);
break;
case 'customers/create':
await handleNewCustomer(payload);
break;
case 'customers/update':
await handleCustomerUpdate(payload);
break;
}
return NextResponse.json({ received: true });
}
function verifyShopifyWebhook(body: string, hmac: string | null): boolean {
if (!hmac) return false;
const hash = crypto
.createHmac('sha256', process.env.SHOPIFY_WEBHOOK_SECRET!)
.update(body, 'utf8')
.digest('base64');
return crypto.timingSafeEqual(Buffer.from(hash), Buffer.from(hmac));
}
客户数据规范化
客户数据经过规范化并进行哈希处理,以满足 Meta 的要求(小写、去除空格、SHA‑256)。
// lib/customer-processor.ts
import crypto from 'crypto';
interface ShopifyCustomer {
id: number;
email: string;
phone?: string;
first_name?: string;
last_name?: string;
orders_count: number;
total_spent: string;
created_at: string;
}
interface MetaUserData {
em?: string;
ph?: string;
fn?: string;
ln?: string;
external_id?: string;
}
function processCustomerForMeta(customer: ShopifyCustomer): MetaUserData {
const userData: MetaUserData = {};
if (customer.email) {
userData.em = hashForMeta(customer.email.toLowerCase().trim());
}
if (customer.phone) {
userData.ph = hashForMeta(normalizePhone(customer.phone));
}
if (customer.first_name) {
userData.fn = hashForMeta(customer.first_name.toLowerCase().trim());
}
if (customer.last_name) {
userData.ln = hashForMeta(customer.last_name.toLowerCase().trim());
}
userData.external_id = hashForMeta(customer.id.toString());
return userData;
}
function hashForMeta(value: string): string {
return crypto.createHash('sha256').update(value).digest('hex');
}
/* Helper to normalize phone numbers – implementation omitted for brevity */
function normalizePhone(phone: string): string {
// e.g., strip non‑numeric characters, ensure E.164 format, etc.
return phone.replace(/\D+/g, '');
}
Meta 营销 API 集成
我们集成了两个 Meta API:
- Custom Audiences API – 同步客户列表。
- Conversions API – 发送实时服务器端事件。
自定义受众同步
// lib/meta-audience-sync.ts
import { chunk, sleep } from './utils';
const META_API_VERSION = 'v18.0';
interface AudienceUser {
email?: string;
phone?: string;
firstName?: string;
lastName?: string;
}
async function addUsersToAudience(
audienceId: string,
users: AudienceUser[],
accessToken: string
): Promise {
const BATCH_SIZE = 10_000;
const batches = chunk(users, BATCH_SIZE);
for (const batch of batches) {
const payload = {
schema: ['EMAIL', 'PHONE', 'FN', 'LN'],
data: batch.map(user => [
user.email ? hashForMeta(user.email.toLowerCase()) : '',
user.phone ? hashForMeta(normalizePhone(user.phone)) : '',
user.firstName ? hashForMeta(user.firstName.toLowerCase()) : '',
user.lastName ? hashForMeta(user.lastName.toLowerCase()) : '',
]),
};
const response = await fetch(
`https://graph.facebook.com/${META_API_VERSION}/${audienceId}/users`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ payload, access_token: accessToken }),
}
);
if (!response.ok) {
const err = await response.text();
throw new Error(`Meta API error: ${err}`);
}
// Respect rate limits
await sleep(1000);
}
}
发送购买事件(Conversions API)
// lib/meta-conversions.ts
import crypto from 'crypto';
interface ProcessedCustomer {
emailHash?: string;
phoneHash?: string;
// …other hashed fields
}
interface ShopifyOrder {
id: number;
created_at: string;
total_price: string;
currency: string;
// …other order fields
}
async function sendPurchaseEvent(
pixelId: string,
customer: ProcessedCustomer,
order: ShopifyOrder,
accessToken: string
) {
const event = {
event_name: 'Purchase',
event_time: Math.floor(new Date(order.created_at).getTime() / 1000),
event_id: `order_${order.id}`,
action_source: 'website',
user_data: {
em: customer.emailHash,
ph: customer.phoneHash,
// add other hashed identifiers as needed
},
custom_data: {
currency: order.currency,
value: parseFloat(order.total_price),
// optional: contents, order_id, etc.
},
};
const response = await fetch(
`https://graph.facebook.com/${META_API_VERSION}/${pixelId}/events`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
data: [event],
access_token: accessToken,
}),
}
);
if (!response.ok) {
const err = await response.text();
throw new Error(`Conversions API error: ${err}`);
}
}
要点
- First‑party data 绕过基于浏览器的跟踪限制。
- Server‑side hashing 满足 Meta 的隐私要求,同时保持匹配率。
- Batching & rate‑limit handling 在处理大型受众列表时至关重要。
如果您正在构建类似的管道,上述模式应为您提供坚实的起点。祝开发愉快!
// Example: Sending a Conversions API event to Meta
const event = {
event_name: 'Purchase',
event_time: Math.floor(Date.now() / 1000),
user_data: {
em: customer.hashedEmail,
ph: customer.hashedPhone,
client_ip_address: order.client_details?.browser_ip,
client_user_agent: order.client_details?.user_agent,
},
custom_data: {
value: parseFloat(order.total_price),
currency: order.currency,
content_ids: order.line_items.map(i => i.product_id.toString()),
content_type: 'product',
num_items: order.line_items.reduce((s, i) => s + i.quantity, 0),
},
};
await fetch(
`https://graph.facebook.com/${META_API_VERSION}/${pixelId}/events`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ data: [event], access_token: accessToken }),
}
);
受众细分
客户会自动分类到与 Meta 同步的细分群体中。
enum CustomerSegment {
NEW = 'new',
ENGAGED = 'engaged',
EXISTING = 'existing',
}
function classifyCustomer(customer: CustomerWithOrders): CustomerSegment {
if (customer.orders_count === 0) return CustomerSegment.ENGAGED;
if (customer.orders_count >= 1) return CustomerSegment.EXISTING;
return CustomerSegment.NEW;
}
关键挑战与解决方案
限流
Meta 实施严格的限制。我们使用指数退避重试。
async function withRetry(
fn: () => Promise,
maxRetries = 3
): Promise {
let lastError: Error;
for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (err) {
lastError = err as Error;
await sleep(2 ** i * 1000); // exponential back‑off
}
}
throw lastError!;
}
幂等的 Webhook 处理
async function processWebhookOnce(
webhookId: string,
handler: () => Promise
): Promise {
const existing = await prisma.processedWebhook.findUnique({
where: { id: webhookId },
});
if (existing) return; // already processed
await handler();
await prisma.processedWebhook.create({
data: { id: webhookId, processedAt: new Date() },
});
}
结果
使用 Audience+ 的店铺通常会看到:
- 提升 50–100 % 的转化(对 Meta 可见)
- 提升 10–20 % 的 ROAS
- 首次实现正确的排除和再营销
我会做的不同之处
- 首先使用 Conversions API。
- 更早地加入 监控。
- 从一开始就使用 队列,而不是同步处理。
试用
如果您想直接使用而不自行构建,请查看:
👉 https://www.audience-plus.com
10 分钟设置,完全自动化。
对实现有疑问吗?请在评论中提出。