使用 Node.js 构建分布式视频转码系统

发布: (2025年12月17日 GMT+8 20:52)
5 min read
原文: Dev.to

Source: Dev.to

Building A Distributed Video Transcoding System with Node.js 的封面图片

为什么需要 Broker?

Broker 是分布式系统的 the “hello world”,原因有二:

  1. 易于快速启动 – 它们强制使用 hive / master‑node 模式,天然可扩展。

    node                
    node    hive / broker   client‑facing server    client
    node                                                client

纯 JavaScript 代理(Broker)用于 Node.js

// broker.js
import Bunny from "bunnimq";
import path from "path";
import { fileURLToPath } from "url";

Bunny({
  port: 3000,
  DEBUG: true,
  cwd: path.dirname(fileURLToPath(import.meta.url)), // path to the .auth file
  queue: {
    Durable: true,
    MessageExpiry: 60 // 1 hour
  }
});

低层 Node.js 优化

  • 对象 → 二进制编译器
  • SharedArrayBuffer 与工作线程
const buffer = new SharedArrayBuffer();
const worker = new Worker(); // **Prerequisite:** FFmpeg must be installed and available in your `PATH`. Test it:
ffmpeg -i img.jpg img.png

分布式视频转码示例

初始化项目

npm init -y && npm i bunnimq bunnimq-driver

文件夹结构

ffmpegserver/
  server.js    # ← 中枢(broker)
  producer.js  # 面向客户端的服务器
  consumer.js  # 节点服务器 / 工作进程
  .auth        # 用于生产者和消费者验证的凭证(类似 .env)

.auth

在此处以 username:password:privileges 形式填写你的密钥凭证(特权请参见 repo):

sk:mypassword:4
jane:doeeee:1
john:doees:3

Server (Hive) – server.js

简单的非 TLS 设置(TLS 支持 – 请参阅 GitHub 仓库):

import Bunny from "bunnimq";
import path from "path";
import { fileURLToPath } from "url";

Bunny({
  port: 3000,
  DEBUG: true,
  cwd: path.dirname(fileURLToPath(import.meta.url)), // for .auth file
  queue: {
    Durable: true,
    QueueExpiry: 0,
    MessageExpiry: 3600
  }
});

生产者 – producer.js

服务器浏览器和其他客户端与之通信。它接受请求并将作业推送到 Hive 中。

import BunnyMQ from "bunnimq-driver";
import fs from "node:fs/promises";

const bunny = new BunnyMQ({
  port: 3000,
  host: "localhost",
  username: "sk",
  password: "mypassword",
});

声明队列(如果不存在)

bunny.queueDeclare(
  {
    name: "transcode_queue",
    config: {
      QueueExpiry: 60,
      MessageExpiry: 20,
      AckExpiry: 10,
      Durable: true,
      noAck: false,
    },
  },
  (res) => {
    console.log("Queue creation:", res);
  }
);

发布视频转码作业

演示时我们从本地文件夹读取视频(请将路径替换为你自己的):

async function processVideos() {
  const videos = await fs.readdir(
    "C:/Users/[path to a folder with videos]/Videos/Capcut/test"
  ); // 通常是存储桶链接

  for (const video of videos) {
    const job = {
      id: Date.now() + Math.random().toString(36).substring(2),
      input: `C:/Users/[path to a folder with videos]/Videos/Capcut/test/${video}`,
      outputFormat: "webm",
    };

    // 放入队列
    bunny.publish("transcode_queue", JSON.stringify(job), (res) => {
      console.log(`Job ${job.id} published:`, res ? "ok" : "400");
    });
  }
}

processVideos();

消费者 – consumer.js

这些是负责拉取任务、转码视频并确认完成的工作节点。

import BunnyMQ from "bunnimq-driver";
import { spawn } from "child_process";
import path from "path";

const bunny = new BunnyMQ({
  port: 3000,
  host: "localhost",
  username: "john",
  password: "doees",
});

消费 transcode_queue

bunny.consume("transcode_queue", async (msg) => {
  console.log("Received message:", msg);

  try {
    const { input, outputFormat } = JSON.parse(msg);

    // Normalise paths
    const absInput = path.resolve(input);
    const output = absInput.replace(/\.[^.]+$/, `.${outputFormat}`);

    console.log(
      `Spawning: ffmpeg -i "${absInput}" -f ${outputFormat} "${output}" -y`
    );

    await new Promise((resolve, reject) => {
      const ffmpeg = spawn(
        "ffmpeg",
        ["-i", absInput, "-f", outputFormat, output, "-y"],
        { shell: true } // helps Windows find ffmpeg.exe
      );

      ffmpeg.on("error", reject);

      // FFmpeg logs to stderr
      ffmpeg.stderr.on("data", (chunk) => {
        process.stderr.write(chunk);
      });

      ffmpeg.on("close", (code) => {
        if (code === 0) {
          console.log(`Transcoding complete: ${output}`);
          // Acknowledge the message
          bunny.Ack((ok) => console.log("Ack sent:", ok));
          resolve();
        } else {
          reject(new Error(`FFmpeg exited with code ${code}`));
        }
      });
    });
  } catch (err) {
    console.error("Failed to process job:", err);
    // Optionally reject / requeue the message here
  }
});

运行系统

  1. 启动 broker

    node server.js
  2. 启动一个或多个消费者(在不同的终端)

    node consumer.js
  3. 发布任务

    node producer.js

您应该会看到每个消费者从队列中取出任务,调用 FFmpeg,并确认完成。扩展非常简单,只需在同一台或不同的机器上启动额外的 consumer.js 进程,这些机器能够访问 broker。

祝转码愉快!

更多内容

在这里找到我

Back to Blog

相关文章

阅读更多 »