使用 Node.js 构建分布式视频转码系统
发布: (2025年12月17日 GMT+8 20:52)
5 min read
原文: Dev.to
Source: Dev.to

为什么需要 Broker?
Broker 是分布式系统的 the “hello world”,原因有二:
-
易于快速启动 – 它们强制使用 hive / master‑node 模式,天然可扩展。
node node hive / broker client‑facing server client node client
纯 JavaScript 代理(Broker)用于 Node.js
// broker.js
import Bunny from "bunnimq";
import path from "path";
import { fileURLToPath } from "url";
Bunny({
port: 3000,
DEBUG: true,
cwd: path.dirname(fileURLToPath(import.meta.url)), // path to the .auth file
queue: {
Durable: true,
MessageExpiry: 60 // 1 hour
}
});
低层 Node.js 优化
- 对象 → 二进制编译器
SharedArrayBuffer与工作线程
const buffer = new SharedArrayBuffer();
const worker = new Worker(); // **Prerequisite:** FFmpeg must be installed and available in your `PATH`. Test it:
ffmpeg -i img.jpg img.png
分布式视频转码示例
初始化项目
npm init -y && npm i bunnimq bunnimq-driver
文件夹结构
ffmpegserver/
server.js # ← 中枢(broker)
producer.js # 面向客户端的服务器
consumer.js # 节点服务器 / 工作进程
.auth # 用于生产者和消费者验证的凭证(类似 .env)
.auth
在此处以 username:password:privileges 形式填写你的密钥凭证(特权请参见 repo):
sk:mypassword:4
jane:doeeee:1
john:doees:3
Server (Hive) – server.js
简单的非 TLS 设置(TLS 已支持 – 请参阅 GitHub 仓库):
import Bunny from "bunnimq";
import path from "path";
import { fileURLToPath } from "url";
Bunny({
port: 3000,
DEBUG: true,
cwd: path.dirname(fileURLToPath(import.meta.url)), // for .auth file
queue: {
Durable: true,
QueueExpiry: 0,
MessageExpiry: 3600
}
});
生产者 – producer.js
服务器浏览器和其他客户端与之通信。它接受请求并将作业推送到 Hive 中。
import BunnyMQ from "bunnimq-driver";
import fs from "node:fs/promises";
const bunny = new BunnyMQ({
port: 3000,
host: "localhost",
username: "sk",
password: "mypassword",
});
声明队列(如果不存在)
bunny.queueDeclare(
{
name: "transcode_queue",
config: {
QueueExpiry: 60,
MessageExpiry: 20,
AckExpiry: 10,
Durable: true,
noAck: false,
},
},
(res) => {
console.log("Queue creation:", res);
}
);
发布视频转码作业
演示时我们从本地文件夹读取视频(请将路径替换为你自己的):
async function processVideos() {
const videos = await fs.readdir(
"C:/Users/[path to a folder with videos]/Videos/Capcut/test"
); // 通常是存储桶链接
for (const video of videos) {
const job = {
id: Date.now() + Math.random().toString(36).substring(2),
input: `C:/Users/[path to a folder with videos]/Videos/Capcut/test/${video}`,
outputFormat: "webm",
};
// 放入队列
bunny.publish("transcode_queue", JSON.stringify(job), (res) => {
console.log(`Job ${job.id} published:`, res ? "ok" : "400");
});
}
}
processVideos();
消费者 – consumer.js
这些是负责拉取任务、转码视频并确认完成的工作节点。
import BunnyMQ from "bunnimq-driver";
import { spawn } from "child_process";
import path from "path";
const bunny = new BunnyMQ({
port: 3000,
host: "localhost",
username: "john",
password: "doees",
});
消费 transcode_queue
bunny.consume("transcode_queue", async (msg) => {
console.log("Received message:", msg);
try {
const { input, outputFormat } = JSON.parse(msg);
// Normalise paths
const absInput = path.resolve(input);
const output = absInput.replace(/\.[^.]+$/, `.${outputFormat}`);
console.log(
`Spawning: ffmpeg -i "${absInput}" -f ${outputFormat} "${output}" -y`
);
await new Promise((resolve, reject) => {
const ffmpeg = spawn(
"ffmpeg",
["-i", absInput, "-f", outputFormat, output, "-y"],
{ shell: true } // helps Windows find ffmpeg.exe
);
ffmpeg.on("error", reject);
// FFmpeg logs to stderr
ffmpeg.stderr.on("data", (chunk) => {
process.stderr.write(chunk);
});
ffmpeg.on("close", (code) => {
if (code === 0) {
console.log(`Transcoding complete: ${output}`);
// Acknowledge the message
bunny.Ack((ok) => console.log("Ack sent:", ok));
resolve();
} else {
reject(new Error(`FFmpeg exited with code ${code}`));
}
});
});
} catch (err) {
console.error("Failed to process job:", err);
// Optionally reject / requeue the message here
}
});
运行系统
-
启动 broker
node server.js -
启动一个或多个消费者(在不同的终端)
node consumer.js -
发布任务
node producer.js
您应该会看到每个消费者从队列中取出任务,调用 FFmpeg,并确认完成。扩展非常简单,只需在同一台或不同的机器上启动额外的 consumer.js 进程,这些机器能够访问 broker。
祝转码愉快!