构建从 Shopify 到 Meta 营销 API 的实时数据管道

发布: (2025年12月19日 GMT+8 08:41)
7 min read
原文: Dev.to

Source: Dev.to

我在过去几个月里构建了 Audience+ —— 一个能够实时将 Shopify 客户数据同步到 Meta 广告平台的工具。

下面是一份清晰、易懂的技术拆解,介绍了它的工作原理、我们解决的挑战,以及一些具体的代码模式,或许能帮助你在构建类似项目时提供参考。

我们要解决的问题

Meta 的基于浏览器的跟踪根本失效。

  • iOS 14.5 应用跟踪透明度隐藏了约 75 % 的 iPhone 用户。
  • Meta 像素仅保留 180 天 的数据。
  • 对于大多数商店,Meta 只基于 30–40 % 的真实转化数据 进行优化。

解决方案: 直接从 Shopify 使用服务器端 API 将 第一方客户和购买数据 发送给 Meta。

架构概览

┌───────────────┐   ┌───────────────┐   ┌───────────────┐
│ Shopify Store │──▶│  Audience+ API│──▶│   Meta API    │
│  (Webhooks)   │   │ (Processing)  │   │               │
└───────────────┘   └───────────────┘   └───────────────┘


                  ┌───────────────┐
                  │  PostgreSQL   │
                  │ (Customer DB)│
                  └───────────────┘

技术栈

组件选择
框架Next.js 15(App Router)
语言TypeScript
API 层tRPC
数据库PostgreSQL(Neon serverless)
ORMPrisma
认证Better‑Auth + Shopify OAuth
托管Vercel

Shopify Webhook 集成

Shopify 会为客户和订单生命周期事件发送 webhook。
我们在处理之前使用 HMAC 签名验证每个请求,以确保其真实性。

// app/api/webhooks/shopify/route.ts
import { NextRequest, NextResponse } from 'next/server';
import crypto from 'crypto';

export async function POST(req: NextRequest) {
  const body = await req.text();
  const hmac = req.headers.get('x-shopify-hmac-sha256');

  if (!verifyShopifyWebhook(body, hmac)) {
    return NextResponse.json({ error: 'Invalid signature' }, { status: 401 });
  }

  const topic = req.headers.get('x-shopify-topic');
  const payload = JSON.parse(body);

  switch (topic) {
    case 'orders/create':
      await handleNewOrder(payload);
      break;
    case 'customers/create':
      await handleNewCustomer(payload);
      break;
    case 'customers/update':
      await handleCustomerUpdate(payload);
      break;
  }

  return NextResponse.json({ received: true });
}

function verifyShopifyWebhook(body: string, hmac: string | null): boolean {
  if (!hmac) return false;

  const hash = crypto
    .createHmac('sha256', process.env.SHOPIFY_WEBHOOK_SECRET!)
    .update(body, 'utf8')
    .digest('base64');

  return crypto.timingSafeEqual(Buffer.from(hash), Buffer.from(hmac));
}

客户数据规范化

客户数据经过规范化并进行哈希处理,以满足 Meta 的要求(小写、去除空格、SHA‑256)。

// lib/customer-processor.ts
import crypto from 'crypto';

interface ShopifyCustomer {
  id: number;
  email: string;
  phone?: string;
  first_name?: string;
  last_name?: string;
  orders_count: number;
  total_spent: string;
  created_at: string;
}

interface MetaUserData {
  em?: string;
  ph?: string;
  fn?: string;
  ln?: string;
  external_id?: string;
}

function processCustomerForMeta(customer: ShopifyCustomer): MetaUserData {
  const userData: MetaUserData = {};

  if (customer.email) {
    userData.em = hashForMeta(customer.email.toLowerCase().trim());
  }

  if (customer.phone) {
    userData.ph = hashForMeta(normalizePhone(customer.phone));
  }

  if (customer.first_name) {
    userData.fn = hashForMeta(customer.first_name.toLowerCase().trim());
  }

  if (customer.last_name) {
    userData.ln = hashForMeta(customer.last_name.toLowerCase().trim());
  }

  userData.external_id = hashForMeta(customer.id.toString());

  return userData;
}

function hashForMeta(value: string): string {
  return crypto.createHash('sha256').update(value).digest('hex');
}

/* Helper to normalize phone numbers – implementation omitted for brevity */
function normalizePhone(phone: string): string {
  // e.g., strip non‑numeric characters, ensure E.164 format, etc.
  return phone.replace(/\D+/g, '');
}

Meta 营销 API 集成

我们集成了两个 Meta API:

  1. Custom Audiences API – 同步客户列表。
  2. Conversions API – 发送实时服务器端事件。

自定义受众同步

// lib/meta-audience-sync.ts
import { chunk, sleep } from './utils';

const META_API_VERSION = 'v18.0';

interface AudienceUser {
  email?: string;
  phone?: string;
  firstName?: string;
  lastName?: string;
}

async function addUsersToAudience(
  audienceId: string,
  users: AudienceUser[],
  accessToken: string
): Promise {
  const BATCH_SIZE = 10_000;
  const batches = chunk(users, BATCH_SIZE);

  for (const batch of batches) {
    const payload = {
      schema: ['EMAIL', 'PHONE', 'FN', 'LN'],
      data: batch.map(user => [
        user.email ? hashForMeta(user.email.toLowerCase()) : '',
        user.phone ? hashForMeta(normalizePhone(user.phone)) : '',
        user.firstName ? hashForMeta(user.firstName.toLowerCase()) : '',
        user.lastName ? hashForMeta(user.lastName.toLowerCase()) : '',
      ]),
    };

    const response = await fetch(
      `https://graph.facebook.com/${META_API_VERSION}/${audienceId}/users`,
      {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ payload, access_token: accessToken }),
      }
    );

    if (!response.ok) {
      const err = await response.text();
      throw new Error(`Meta API error: ${err}`);
    }

    // Respect rate limits
    await sleep(1000);
  }
}

发送购买事件(Conversions API)

// lib/meta-conversions.ts
import crypto from 'crypto';

interface ProcessedCustomer {
  emailHash?: string;
  phoneHash?: string;
  // …other hashed fields
}

interface ShopifyOrder {
  id: number;
  created_at: string;
  total_price: string;
  currency: string;
  // …other order fields
}

async function sendPurchaseEvent(
  pixelId: string,
  customer: ProcessedCustomer,
  order: ShopifyOrder,
  accessToken: string
) {
  const event = {
    event_name: 'Purchase',
    event_time: Math.floor(new Date(order.created_at).getTime() / 1000),
    event_id: `order_${order.id}`,
    action_source: 'website',
    user_data: {
      em: customer.emailHash,
      ph: customer.phoneHash,
      // add other hashed identifiers as needed
    },
    custom_data: {
      currency: order.currency,
      value: parseFloat(order.total_price),
      // optional: contents, order_id, etc.
    },
  };

  const response = await fetch(
    `https://graph.facebook.com/${META_API_VERSION}/${pixelId}/events`,
    {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        data: [event],
        access_token: accessToken,
      }),
    }
  );

  if (!response.ok) {
    const err = await response.text();
    throw new Error(`Conversions API error: ${err}`);
  }
}

要点

  • First‑party data 绕过基于浏览器的跟踪限制。
  • Server‑side hashing 满足 Meta 的隐私要求,同时保持匹配率。
  • Batching & rate‑limit handling 在处理大型受众列表时至关重要。

如果您正在构建类似的管道,上述模式应为您提供坚实的起点。祝开发愉快!

// Example: Sending a Conversions API event to Meta
const event = {
  event_name: 'Purchase',
  event_time: Math.floor(Date.now() / 1000),
  user_data: {
    em: customer.hashedEmail,
    ph: customer.hashedPhone,
    client_ip_address: order.client_details?.browser_ip,
    client_user_agent: order.client_details?.user_agent,
  },
  custom_data: {
    value: parseFloat(order.total_price),
    currency: order.currency,
    content_ids: order.line_items.map(i => i.product_id.toString()),
    content_type: 'product',
    num_items: order.line_items.reduce((s, i) => s + i.quantity, 0),
  },
};

await fetch(
  `https://graph.facebook.com/${META_API_VERSION}/${pixelId}/events`,
  {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ data: [event], access_token: accessToken }),
  }
);

受众细分

客户会自动分类到与 Meta 同步的细分群体中。

enum CustomerSegment {
  NEW = 'new',
  ENGAGED = 'engaged',
  EXISTING = 'existing',
}

function classifyCustomer(customer: CustomerWithOrders): CustomerSegment {
  if (customer.orders_count === 0) return CustomerSegment.ENGAGED;
  if (customer.orders_count >= 1) return CustomerSegment.EXISTING;
  return CustomerSegment.NEW;
}

关键挑战与解决方案

限流

Meta 实施严格的限制。我们使用指数退避重试。

async function withRetry(
  fn: () => Promise,
  maxRetries = 3
): Promise {
  let lastError: Error;

  for (let i = 0; i < maxRetries; i++) {
    try {
      return await fn();
    } catch (err) {
      lastError = err as Error;
      await sleep(2 ** i * 1000); // exponential back‑off
    }
  }
  throw lastError!;
}

幂等的 Webhook 处理

async function processWebhookOnce(
  webhookId: string,
  handler: () => Promise
): Promise {
  const existing = await prisma.processedWebhook.findUnique({
    where: { id: webhookId },
  });

  if (existing) return; // already processed

  await handler();

  await prisma.processedWebhook.create({
    data: { id: webhookId, processedAt: new Date() },
  });
}

结果

使用 Audience+ 的店铺通常会看到:

  • 提升 50–100 % 的转化(对 Meta 可见)
  • 提升 10–20 % 的 ROAS
  • 首次实现正确的排除和再营销

我会做的不同之处

  • 首先使用 Conversions API
  • 更早地加入 监控
  • 从一开始就使用 队列,而不是同步处理。

试用

如果您想直接使用而不自行构建,请查看:

👉 https://www.audience-plus.com

10 分钟设置,完全自动化。

对实现有疑问吗?请在评论中提出。

Back to Blog

相关文章

阅读更多 »