在 Cloudflare Workers 中触发长作业
Source: Dev.to
问题:我的作业对 HTTP 来说太长了
我有一个处理后台管理 UI 的 Worker。其功能之一是点击按钮后启动一个耗时的后台进程——比如爬取、数据处理、批量操作等。
export default {
async fetch(request, env, ctx) {
if (request.url.endsWith('/admin/run-job')) {
await runHeavyJob(); // 😬
return new Response('Job complete!');
}
}
}
在开发环境可以正常工作。但在生产环境会因为 超时 而失败,因为 HTTP 请求有严格的限制:
| 计划 | CPU 时间 | 实际耗时 |
|---|---|---|
| 免费 | 10 ms | 30 s |
| Workers 付费 | 50 ms | 30 s |
| Business+ | 30 s | 30 s |
我的作业需要超过 30 秒的实际耗时,而且 CPU 时间也很快就用完。即使在付费计划上,我仍然会触碰到限制。
我尝试使用 ctx.waitUntil():
export default {
async fetch(request, env, ctx) {
if (request.url.endsWith('/admin/run-job')) {
ctx.waitUntil(runHeavyJob()); // 仍然不行! 😭
return new Response('Job started!');
}
}
}
waitUntil() 并不会延长超时时间;它只是在发送响应后让你进行清理工作。隔离环境仍会在相同的时间限制后关闭。
为什么我不能直接使用 scheduled()
我曾想复用已有的 cron 作业:
export default {
async fetch(request, env, ctx) {
if (request.url.endsWith('/admin/run-job')) {
// 能不能……直接调用 scheduled()? 🤔
await this.scheduled(); // 不行!
return new Response('Done!');
}
},
async scheduled(event, env, ctx) {
await runHeavyJob(); // 这很好用!
}
}
scheduled() 只能由 Cloudflare 的 cron 系统触发,不能在代码中直接调用。我尝试的变通办法包括:
- 调用 Cloudflare API 触发 cron(需要外部认证,且不是即时的)
- 设置 webhook 调用外部服务(违背了 Workers 的初衷)
- 在 KV 中存 flag 并每分钟轮询一次(可行,但感觉很 hack)
灵感闪现:队列正是为此而生
Cloudflare Queues 提供了第三种调用处理程序:
export default {
async fetch(request, env, ctx) { /* ... */ },
async scheduled(event, env, ctx) { /* ... */ },
async queue(batch, env, ctx) { /* ... */ } // 👈 这一个!
}
按处理程序类型的执行限制
| 处理程序 | CPU 时间 | 适用场景 |
|---|---|---|
fetch() | 10‑50 ms(大多数计划) | 快速 API、UI |
scheduled() | 30 s | 周期性任务 |
queue() | 无限 ⚡ | 重型处理 |
Queue 处理程序没有 CPU 时间限制——只有以分钟计的实际耗时限制。
我的实际解决方案
Worker 1:管理 UI(生产者)
export default {
async fetch(request, env, ctx) {
if (request.url.endsWith('/admin/run-job')) {
// 入队一条消息
await env.MY_QUEUE.send({
type: 'heavy-job',
triggeredBy: 'admin',
timestamp: Date.now()
});
return new Response('Job queued!');
}
}
}
Worker 2:作业运行器(消费者)
export default {
async queue(batch, env, ctx) {
for (const message of batch.messages) {
const { type, triggeredBy } = message.body;
if (type === 'heavy-job') {
await runHeavyJob(); // 无限 CPU 时间! 🎉
message.ack();
}
}
}
}
为什么可行:
- UI Worker 只负责入队并返回,保持快速。
- 作业 Worker 在无限 CPU 时间下运行。
- 队列会自动处理重试。
- 两个 Worker 可以独立扩展。
- 执行几乎是即时的(没有轮询延迟)。
重要说明:处理程序之间不争抢资源
每一次处理程序调用都在各自独立的执行上下文中运行,因此正在运行的 queue 作业不会拖慢 HTTP 请求。它们唯一共享的是 代码包(更大的包会导致冷启动变慢)和 部署(一个处理程序的 bug 会影响整个 Worker)。
如果愿意,也可以在同一个 Worker 中合并这三种处理程序:
export default {
async fetch(request, env, ctx) {
await env.MY_QUEUE.send({ type: 'job' });
return new Response('Queued!');
},
async scheduled(event, env, ctx) {
await env.MY_QUEUE.send({ type: 'cron-job' });
},
async queue(batch, env, ctx) {
await runHeavyJob(); // 不会拖慢 fetch()
}
}
我更倾向于把它们拆分,以保持 UI 包体积小、实现独立部署,并保持更清晰的关注点分离。
我考虑过的其他方案
Cron 轮询
在 KV 中设置 flag,并在 scheduled() 中每分钟检查一次:
export default {
async fetch(request, env, ctx) {
await env.KV.put('pending-job', 'true');
return new Response('Job will run soon');
},
async scheduled(event, env, ctx) {
const pending = await env.KV.get('pending-job');
if (pending) {
await runHeavyJob();
await env.KV.delete('pending-job');
}
}
}
可行,但不是即时的——受限于 cron 间隔(最小 1 分钟)。
Durable Object 警报
Durable Objects 可以设置几乎立即触发的警报:
export class JobRunner {
async fetch(request) {
await this.storage.setAlarm(Date.now() + 100); // 100 ms
return new Response('Alarm set');
}
async alarm() {
await runHeavyJob(); // 在 DO 上下文中运行
}
}
优雅,但需要搭建 Durable Objects,对于简单的后台作业来说可能显得有些重量级。
我的建议
对于按需的长时作业,使用 Queues。 它们正是为这种场景而设计的:
- 无限 CPU 时间
- 内置重试逻辑
- 简单的 API
- 自动扩展
- 几乎即时执行
最小化配置
# wrangler.toml
[[queues.producers]]
queue = "my-jobs"
binding = "MY_QUEUE"
[[queues.consumers]]
queue = "my-jobs"
max_batch_size = 10
max_batch_timeout = 30
结语
- 不要与平台对抗。 试图让
fetch()做它不擅长的事只会浪费时间。 - 阅读限制。 了解 CPU 与实际耗时的区别帮我省下了数小时的调试。
- Queues 被低估了。 它们不仅适用于分布式系统,也非常适合单体应用中的后台作业。