构建生产就绪的定时推送通知系统,使用 NestJS Cron 与 Firebase

发布: (2025年12月16日 GMT+8 02:00)
6 min read
原文: Dev.to

Source: Dev.to

从即时投递到精准调度

构建一个可靠的基于 cron 的通知调度器,能够处理时区复杂性、数据库竞争条件以及移动端数据契约。

当你的 Firebase 推送通知服务器已经通过 BullMQ 队列实现了对数十万用户的批量投递时,下一步自然是调度投递。听起来很简单,对吧?只要加一个 cron 任务和一张调度表。

我实际遇到的,却是一段穿越时区地狱、TypeORM NULL 比较怪癖、游标分页 bug,以及数据库 schema 与移动端数据契约细微差别的奇妙旅程。下面是我如何构建一个生产就绪的调度通知系统,它现在每天能够处理数千条基于时间的通知,且零漏投。

起点:已有的批量通知系统

在讨论调度之前,先把背景说清楚。我已经有了一套稳健的批量通知基础设施:

// firebase.controller.ts - Existing mass notification endpoint
@Post('send-to-conditional-users')
async sendConditionalNotifications(
  @Query() query: SendMultiNotificationDto,
  @Body() body?: SendDataDto,
): Promise {
  const jobId = `conditional-${uuidv4()}`;

  const jobData: ConditionalNotificationParams = {
    ...query,
    jobId,
    chunkSize: 500,
    chunkDelay: 2000,
    data: body || {},
  };

  // Add to BullMQ queue - processed asynchronously
  await this.pushQueue.add('send-conditional-notification', jobData, {
    jobId,
    removeOnComplete: true,
    removeOnFail: false,
  });

  return CommonResponseDto.messageSendSuccess();
}

关键组件

  • BullMQ 用于异步作业处理
  • 基于游标的分页 用于高效的数据库查询
  • 分块投递(每块 500 条 token,间隔 2 秒)
  • 失败重试逻辑 自动处理投递失败的情况
  • Redis 管理作业队列

挑战:如何让这些通知在未来的特定时间点执行?

目标:像专业人士一样调度通知

需求

  • 将调度通知存入 MySQL
  • 使用 NestJS cron 每分钟检查一次待执行的通知
  • 在预定时间触发已有的通知流水线
  • 支持所有相同的过滤选项(性别、年龄、平台等)
  • 追踪执行状态(pendingprocessingcompleted
  • 正确处理时区(韩国标准时间)

设想的架构

┌─────────────────┐
│  Schedule API   │ ──► INSERT into MySQL
│ (Create/Update) │
└─────────────────┘

┌──────────────────────┐
│  MySQL Schedule DB   │
│  scheduled_send_date │
└──────────────────────┘
        ↓ 每分钟一次
┌──────────────────────┐
│   NestJS Cron Job    │
│ @Cron(EVERY_MINUTE) │
└──────────────────────┘
        ↓ 找到匹配?
┌──────────────────────┐
│    BullMQ Queue      │
│ (Existing Pipeline)  │
└──────────────────────┘

┌──────────────────────┐
│  Firebase Worker     │
│  (Send to users)     │
└──────────────────────┘

实现过程:边缘案例的探索

步骤 1:创建调度表

// push-notification-schedule.entity.ts
@Entity({ name: 'push_notification_schedule' })
@Index(['sent_yn', 'scheduled_send_date'])  // Critical for cron queries
@Index(['job_id'])
export class PushNotificationSchedule {
  @PrimaryGeneratedColumn({ type: 'int' })
  seq: number;

  @Column({ type: 'varchar', length: 200, nullable: true })
  job_id: string; // BullMQ job ID once queued

  @Column({ type: 'varchar', length: 200 })
  title: string;

  @Column({ type: 'text' })
  content: string;

  @Column({ type: 'datetime' })
  scheduled_send_date: Date; // When to send

  @Column({ type: 'datetime', precision: 6, nullable: true })
  actual_send_start_date: Date | null; // When actually sent

  @Column({ type: 'datetime', precision: 6, nullable: true })
  actual_send_end_date: Date | null;

  @Column({ type: 'int', default: 0 })
  total_send_count: number; // How many sent

  @Column({ type: 'tinyint', width: 1, default: 0 })
  sent_yn: number; // 0: pending, 1: completed

  // Filter fields (same as immediate API)
  @Column({ type: 'varchar', length: 1, nullable: true })
  push_onoff: string; // 'Y' = only subscribers

  @Column({ type: 'varchar', length: 1, nullable: true })
  marketing_onoff: string;

  @Column({ type: 'varchar', length: 20, nullable: true })
  topic: string; // FCM topic

  // Mobile app deep‑link data
  @Column({ type: 'varchar', length: 50, nullable: true })
  division: string; // e.g., 'bible'

  @Column({ type: 'int', nullable: true })
  version: number;

  @Column({ type: 'int', nullable: true })
  bible_code: number; // Database uses snake_case

  @Column({ type: 'int', nullable: true })
  chapter: number;

  @Column({ type: 'int', nullable: true })
  section: number;

  @Column({ type: 'varchar', length: 500, nullable: true })
  landing_url: string;

  @CreateDateColumn({ type: 'datetime' })
  regdate: Date;
}

关键设计决策

  • sent_yn 标记防止重复执行。
  • job_id 用来追踪一旦入队的 BullMQ 作业。
  • actual_send_start_date / actual_send_end_date 使用微秒精度,便于分析。
  • bible_code 等字段支持圣经 App 的深度链接。

步骤 2:天真的 Cron 实现(未成功)

我的第一次尝试看起来合情合理:

// scheduler.service.ts - First attempt (broken!)
@Cron(CronExpression.EVERY_MINUTE)
async handleScheduledPushNotifications() {
  const nowKST = moment.tz('Asia/Seoul').startOf('minute').toDate();

  // Find schedules within ±1 minute window
  const startWindow = moment(nowKST).subtract(1, 'minutes').toDate();
  const endWindow = moment(nowKST).add(1, 'minutes').toDate();

  const pendingSchedules = await this.scheduleRepository
    .createQueryBuilder('schedule')
    .where('schedule.sent_yn = :sentYn', { sentYn: 0 })
    .andWhere('schedule.job_id = :jobId', { jobId: null }) // ❌ BUG!
    .andWhere('schedule.scheduled_send_date BETWEEN :start AND :end', {
      start: startWindow,
      end: endWindow,
    })
    .getMany();

  for (const schedule of pendingSchedules) {
    await this.processSchedule(schedule);
  }
}

private async processSchedule(schedule: PushNotificationSchedule) {
  const jobId = `scheduled-${schedule.seq}-${uuidv4()}`;

  // Mark as processing
  await this.scheduleRepository.update(
    {
      seq: schedule.seq,
      sent_yn: 0,
      job_id: null, // ❌ BUG!
    },
    {
      job_id: jobId,
      sent_yn: 1,
    },
  );

  // Queue the job
  const jobData = { /* ...build payload from schedule... */ };
  await this.pushQueue.add('send-scheduled-notification', jobData, { jobId });
}

出错原因

  1. TypeORM 的 NULL 比较陷阱
    条件 schedule.job_id = :jobId 并传入 jobId: null 会生成 WHERE job_id = NULL,这永远为假。正确的 SQL 应该是 WHERE job_id IS NULL,而在 TypeORM 中需要使用 .isNull() 或原始条件。

  2. update 调用中的同样问题
    WHERE 部分提供 { job_id: null } 同样会被翻译成 job_id = NULL,导致行匹配失败,调度记录得不到更新。

这些 bug 让 cron 任务始终找不到待处理的调度记录,导致通知被漏投。

Back to Blog

相关文章

阅读更多 »

开发工具中心 API

我构建的作品提交给 Xano AI 驱动的后端挑战 https://dev.to/challenges/xano-2025-11-20:生产就绪的公共 API 标题:DevTools Resourc...