Building a Real-Time Data Pipeline from Shopify to Meta's Marketing API
Source: Dev.to
I spent the last few months building Audience+ — a tool that syncs Shopify customer data to Meta’s advertising platform in real time.
Below is a clear, accessible technical breakdown of how it works, the challenges we solved, and concrete code patterns that may help if you’re building something similar.
The Problem We’re Solving
Meta’s browser‑based tracking is fundamentally broken.
- iOS 14.5 App Tracking Transparency hides ~75 % of iPhone users.
- The Meta Pixel only retains data for 180 days.
- Meta optimizes on 30–40 % of real conversion data for most stores.
The solution: send first‑party customer and purchase data directly from Shopify to Meta using server‑side APIs.
Architecture Overview
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Shopify Store │──▶│ Audience+ API│──▶│ Meta API │
│ (Webhooks) │ │ (Processing) │ │ │
└───────────────┘ └───────────────┘ └───────────────┘
│
▼
┌───────────────┐
│ PostgreSQL │
│ (Customer DB)│
└───────────────┘
Tech Stack
| Component | Choice |
|---|---|
| Framework | Next.js 15 (App Router) |
| Language | TypeScript |
| API Layer | tRPC |
| Database | PostgreSQL (Neon serverless) |
| ORM | Prisma |
| Auth | Better‑Auth + Shopify OAuth |
| Hosting | Vercel |
Shopify Webhook Integration
Shopify sends webhooks for customer and order lifecycle events.
We verify each request using HMAC signatures before processing to ensure authenticity.
// app/api/webhooks/shopify/route.ts
import { NextRequest, NextResponse } from 'next/server';
import crypto from 'crypto';
export async function POST(req: NextRequest) {
const body = await req.text();
const hmac = req.headers.get('x-shopify-hmac-sha256');
if (!verifyShopifyWebhook(body, hmac)) {
return NextResponse.json({ error: 'Invalid signature' }, { status: 401 });
}
const topic = req.headers.get('x-shopify-topic');
const payload = JSON.parse(body);
switch (topic) {
case 'orders/create':
await handleNewOrder(payload);
break;
case 'customers/create':
await handleNewCustomer(payload);
break;
case 'customers/update':
await handleCustomerUpdate(payload);
break;
}
return NextResponse.json({ received: true });
}
function verifyShopifyWebhook(body: string, hmac: string | null): boolean {
if (!hmac) return false;
const hash = crypto
.createHmac('sha256', process.env.SHOPIFY_WEBHOOK_SECRET!)
.update(body, 'utf8')
.digest('base64');
return crypto.timingSafeEqual(Buffer.from(hash), Buffer.from(hmac));
}
Customer Data Normalization
Customer data is normalized and hashed to meet Meta’s requirements (lower‑case, trimmed, SHA‑256).
// lib/customer-processor.ts
import crypto from 'crypto';
interface ShopifyCustomer {
id: number;
email: string;
phone?: string;
first_name?: string;
last_name?: string;
orders_count: number;
total_spent: string;
created_at: string;
}
interface MetaUserData {
em?: string;
ph?: string;
fn?: string;
ln?: string;
external_id?: string;
}
function processCustomerForMeta(customer: ShopifyCustomer): MetaUserData {
const userData: MetaUserData = {};
if (customer.email) {
userData.em = hashForMeta(customer.email.toLowerCase().trim());
}
if (customer.phone) {
userData.ph = hashForMeta(normalizePhone(customer.phone));
}
if (customer.first_name) {
userData.fn = hashForMeta(customer.first_name.toLowerCase().trim());
}
if (customer.last_name) {
userData.ln = hashForMeta(customer.last_name.toLowerCase().trim());
}
userData.external_id = hashForMeta(customer.id.toString());
return userData;
}
function hashForMeta(value: string): string {
return crypto.createHash('sha256').update(value).digest('hex');
}
/* Helper to normalize phone numbers – implementation omitted for brevity */
function normalizePhone(phone: string): string {
// e.g., strip non‑numeric characters, ensure E.164 format, etc.
return phone.replace(/\D+/g, '');
}
Meta Marketing API Integration
We integrate with two Meta APIs:
- Custom Audiences API – syncs customer lists.
- Conversions API – sends real‑time server‑side events.
Custom Audience Sync
// lib/meta-audience-sync.ts
import { chunk, sleep } from './utils';
const META_API_VERSION = 'v18.0';
interface AudienceUser {
email?: string;
phone?: string;
firstName?: string;
lastName?: string;
}
async function addUsersToAudience(
audienceId: string,
users: AudienceUser[],
accessToken: string
): Promise {
const BATCH_SIZE = 10_000;
const batches = chunk(users, BATCH_SIZE);
for (const batch of batches) {
const payload = {
schema: ['EMAIL', 'PHONE', 'FN', 'LN'],
data: batch.map(user => [
user.email ? hashForMeta(user.email.toLowerCase()) : '',
user.phone ? hashForMeta(normalizePhone(user.phone)) : '',
user.firstName ? hashForMeta(user.firstName.toLowerCase()) : '',
user.lastName ? hashForMeta(user.lastName.toLowerCase()) : '',
]),
};
const response = await fetch(
`https://graph.facebook.com/${META_API_VERSION}/${audienceId}/users`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ payload, access_token: accessToken }),
}
);
if (!response.ok) {
const err = await response.text();
throw new Error(`Meta API error: ${err}`);
}
// Respect rate limits
await sleep(1000);
}
}
Sending Purchase Events (Conversions API)
// lib/meta-conversions.ts
import crypto from 'crypto';
interface ProcessedCustomer {
emailHash?: string;
phoneHash?: string;
// …other hashed fields
}
interface ShopifyOrder {
id: number;
created_at: string;
total_price: string;
currency: string;
// …other order fields
}
async function sendPurchaseEvent(
pixelId: string,
customer: ProcessedCustomer,
order: ShopifyOrder,
accessToken: string
) {
const event = {
event_name: 'Purchase',
event_time: Math.floor(new Date(order.created_at).getTime() / 1000),
event_id: `order_${order.id}`,
action_source: 'website',
user_data: {
em: customer.emailHash,
ph: customer.phoneHash,
// add other hashed identifiers as needed
},
custom_data: {
currency: order.currency,
value: parseFloat(order.total_price),
// optional: contents, order_id, etc.
},
};
const response = await fetch(
`https://graph.facebook.com/${META_API_VERSION}/${pixelId}/events`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
data: [event],
access_token: accessToken,
}),
}
);
if (!response.ok) {
const err = await response.text();
throw new Error(`Conversions API error: ${err}`);
}
}
Takeaways
- First‑party data bypasses the limitations of browser‑based tracking.
- Server‑side hashing satisfies Meta’s privacy requirements while preserving match rates.
- Batching & rate‑limit handling are essential when dealing with large audience lists.
If you’re building a similar pipeline, the patterns above should give you a solid starting point. Happy hacking!
// Example: Sending a Conversions API event to Meta
const event = {
event_name: 'Purchase',
event_time: Math.floor(Date.now() / 1000),
user_data: {
em: customer.hashedEmail,
ph: customer.hashedPhone,
client_ip_address: order.client_details?.browser_ip,
client_user_agent: order.client_details?.user_agent,
},
custom_data: {
value: parseFloat(order.total_price),
currency: order.currency,
content_ids: order.line_items.map(i => i.product_id.toString()),
content_type: 'product',
num_items: order.line_items.reduce((s, i) => s + i.quantity, 0),
},
};
await fetch(
`https://graph.facebook.com/${META_API_VERSION}/${pixelId}/events`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ data: [event], access_token: accessToken }),
}
);
Audience Segmentation
Customers are automatically classified into segments that stay in sync with Meta.
enum CustomerSegment {
NEW = 'new',
ENGAGED = 'engaged',
EXISTING = 'existing',
}
function classifyCustomer(customer: CustomerWithOrders): CustomerSegment {
if (customer.orders_count === 0) return CustomerSegment.ENGAGED;
if (customer.orders_count >= 1) return CustomerSegment.EXISTING;
return CustomerSegment.NEW;
}
Key Challenges & Solutions
Rate Limiting
Meta enforces strict limits. We use exponential back‑off retries.
async function withRetry(
fn: () => Promise,
maxRetries = 3
): Promise {
let lastError: Error;
for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (err) {
lastError = err as Error;
await sleep(2 ** i * 1000); // exponential back‑off
}
}
throw lastError!;
}
Idempotent Webhook Processing
async function processWebhookOnce(
webhookId: string,
handler: () => Promise
): Promise {
const existing = await prisma.processedWebhook.findUnique({
where: { id: webhookId },
});
if (existing) return; // already processed
await handler();
await prisma.processedWebhook.create({
data: { id: webhookId, processedAt: new Date() },
});
}
Results
Stores using Audience+ typically see:
- 50–100 % more conversions visible to Meta
- 10–20 % ROAS improvement
- Correct exclusions and retargeting for the first time
What I’d Do Differently
- Start with the Conversions API first.
- Add monitoring earlier.
- Use queues from day one instead of synchronous processing.
Try It Out
If you want this without building it yourself, check out:
👉 https://www.audience-plus.com
10‑minute setup. Fully automated.
Have questions about the implementation? Drop them in the comments.