构建生产就绪的定时推送通知系统,使用 NestJS Cron 与 Firebase
Source: Dev.to
从即时投递到精准调度
构建一个可靠的基于 cron 的通知调度器,能够处理时区复杂性、数据库竞争条件以及移动端数据契约。
当你的 Firebase 推送通知服务器已经通过 BullMQ 队列实现了对数十万用户的批量投递时,下一步自然是调度投递。听起来很简单,对吧?只要加一个 cron 任务和一张调度表。
我实际遇到的,却是一段穿越时区地狱、TypeORM NULL 比较怪癖、游标分页 bug,以及数据库 schema 与移动端数据契约细微差别的奇妙旅程。下面是我如何构建一个生产就绪的调度通知系统,它现在每天能够处理数千条基于时间的通知,且零漏投。
起点:已有的批量通知系统
在讨论调度之前,先把背景说清楚。我已经有了一套稳健的批量通知基础设施:
// firebase.controller.ts - Existing mass notification endpoint
@Post('send-to-conditional-users')
async sendConditionalNotifications(
@Query() query: SendMultiNotificationDto,
@Body() body?: SendDataDto,
): Promise {
const jobId = `conditional-${uuidv4()}`;
const jobData: ConditionalNotificationParams = {
...query,
jobId,
chunkSize: 500,
chunkDelay: 2000,
data: body || {},
};
// Add to BullMQ queue - processed asynchronously
await this.pushQueue.add('send-conditional-notification', jobData, {
jobId,
removeOnComplete: true,
removeOnFail: false,
});
return CommonResponseDto.messageSendSuccess();
}
关键组件
- BullMQ 用于异步作业处理
- 基于游标的分页 用于高效的数据库查询
- 分块投递(每块 500 条 token,间隔 2 秒)
- 失败重试逻辑 自动处理投递失败的情况
- Redis 管理作业队列
挑战:如何让这些通知在未来的特定时间点执行?
目标:像专业人士一样调度通知
需求
- 将调度通知存入 MySQL
- 使用 NestJS cron 每分钟检查一次待执行的通知
- 在预定时间触发已有的通知流水线
- 支持所有相同的过滤选项(性别、年龄、平台等)
- 追踪执行状态(
pending、processing、completed) - 正确处理时区(韩国标准时间)
设想的架构
┌─────────────────┐
│ Schedule API │ ──► INSERT into MySQL
│ (Create/Update) │
└─────────────────┘
↓
┌──────────────────────┐
│ MySQL Schedule DB │
│ scheduled_send_date │
└──────────────────────┘
↓ 每分钟一次
┌──────────────────────┐
│ NestJS Cron Job │
│ @Cron(EVERY_MINUTE) │
└──────────────────────┘
↓ 找到匹配?
┌──────────────────────┐
│ BullMQ Queue │
│ (Existing Pipeline) │
└──────────────────────┘
↓
┌──────────────────────┐
│ Firebase Worker │
│ (Send to users) │
└──────────────────────┘
实现过程:边缘案例的探索
步骤 1:创建调度表
// push-notification-schedule.entity.ts
@Entity({ name: 'push_notification_schedule' })
@Index(['sent_yn', 'scheduled_send_date']) // Critical for cron queries
@Index(['job_id'])
export class PushNotificationSchedule {
@PrimaryGeneratedColumn({ type: 'int' })
seq: number;
@Column({ type: 'varchar', length: 200, nullable: true })
job_id: string; // BullMQ job ID once queued
@Column({ type: 'varchar', length: 200 })
title: string;
@Column({ type: 'text' })
content: string;
@Column({ type: 'datetime' })
scheduled_send_date: Date; // When to send
@Column({ type: 'datetime', precision: 6, nullable: true })
actual_send_start_date: Date | null; // When actually sent
@Column({ type: 'datetime', precision: 6, nullable: true })
actual_send_end_date: Date | null;
@Column({ type: 'int', default: 0 })
total_send_count: number; // How many sent
@Column({ type: 'tinyint', width: 1, default: 0 })
sent_yn: number; // 0: pending, 1: completed
// Filter fields (same as immediate API)
@Column({ type: 'varchar', length: 1, nullable: true })
push_onoff: string; // 'Y' = only subscribers
@Column({ type: 'varchar', length: 1, nullable: true })
marketing_onoff: string;
@Column({ type: 'varchar', length: 20, nullable: true })
topic: string; // FCM topic
// Mobile app deep‑link data
@Column({ type: 'varchar', length: 50, nullable: true })
division: string; // e.g., 'bible'
@Column({ type: 'int', nullable: true })
version: number;
@Column({ type: 'int', nullable: true })
bible_code: number; // Database uses snake_case
@Column({ type: 'int', nullable: true })
chapter: number;
@Column({ type: 'int', nullable: true })
section: number;
@Column({ type: 'varchar', length: 500, nullable: true })
landing_url: string;
@CreateDateColumn({ type: 'datetime' })
regdate: Date;
}
关键设计决策
sent_yn标记防止重复执行。job_id用来追踪一旦入队的 BullMQ 作业。actual_send_start_date/actual_send_end_date使用微秒精度,便于分析。bible_code等字段支持圣经 App 的深度链接。
步骤 2:天真的 Cron 实现(未成功)
我的第一次尝试看起来合情合理:
// scheduler.service.ts - First attempt (broken!)
@Cron(CronExpression.EVERY_MINUTE)
async handleScheduledPushNotifications() {
const nowKST = moment.tz('Asia/Seoul').startOf('minute').toDate();
// Find schedules within ±1 minute window
const startWindow = moment(nowKST).subtract(1, 'minutes').toDate();
const endWindow = moment(nowKST).add(1, 'minutes').toDate();
const pendingSchedules = await this.scheduleRepository
.createQueryBuilder('schedule')
.where('schedule.sent_yn = :sentYn', { sentYn: 0 })
.andWhere('schedule.job_id = :jobId', { jobId: null }) // ❌ BUG!
.andWhere('schedule.scheduled_send_date BETWEEN :start AND :end', {
start: startWindow,
end: endWindow,
})
.getMany();
for (const schedule of pendingSchedules) {
await this.processSchedule(schedule);
}
}
private async processSchedule(schedule: PushNotificationSchedule) {
const jobId = `scheduled-${schedule.seq}-${uuidv4()}`;
// Mark as processing
await this.scheduleRepository.update(
{
seq: schedule.seq,
sent_yn: 0,
job_id: null, // ❌ BUG!
},
{
job_id: jobId,
sent_yn: 1,
},
);
// Queue the job
const jobData = { /* ...build payload from schedule... */ };
await this.pushQueue.add('send-scheduled-notification', jobData, { jobId });
}
出错原因
-
TypeORM 的
NULL比较陷阱
条件schedule.job_id = :jobId并传入jobId: null会生成WHERE job_id = NULL,这永远为假。正确的 SQL 应该是WHERE job_id IS NULL,而在 TypeORM 中需要使用.isNull()或原始条件。 -
update调用中的同样问题
在WHERE部分提供{ job_id: null }同样会被翻译成job_id = NULL,导致行匹配失败,调度记录得不到更新。
这些 bug 让 cron 任务始终找不到待处理的调度记录,导致通知被漏投。