自调度的周期性云任务(使用 Terraform + Python 代码)
发布: (2026年1月12日 GMT+8 17:48)
4 分钟阅读
原文: Dev.to
Source: Dev.to

概览
使用完全无服务器的云部署时,虽然“如何定期运行此函数?”看似简单,但答案并不容易。传统的 CRON 任务在这里行不通。
相反,你可以通过利用 计划的 Cloud Tasks(自行重新调度)来实现周期性执行。
工作流程
- 设置一个 Cloud Tasks 队列。
- 在 Cloud Run 函数中添加代码,以将下一个任务加入队列。
- 创建初始任务以启动整个链条。
服务账号
创建将在 Cloud Function 中运行的服务账号。
resource "google_service_account" "" {
account_id = ""
display_name = "My Service Account"
description = "Service account used for my function"
project = var.project_id
}
授予 Cloud Tasks 所需的角色以及调用 Cloud Run 的权限。
resource "google_project_iam_member" "" {
for_each = toset([
"roles/run.invoker",
"roles/cloudtasks.enqueuer"
])
project = var.project_id
role = each.value
member = "serviceAccount:${google_service_account..email}"
depends_on = [google_service_account.]
}
允许服务账号在排队下一个任务时冒充自身。
resource "google_service_account_iam_member" "" {
service_account_id = google_service_account..name
role = "roles/iam.serviceAccountUser"
member = "serviceAccount:${google_service_account..email"
}
云任务队列
创建用于保存计划 HTTP 任务的 Cloud Tasks 队列。
resource "google_cloud_tasks_queue" "" {
name = ""
location = var.region
http_target {
http_method = "POST"
oidc_token {
service_account_email =
}
}
}
从函数排队任务
添加一个辅助函数到你的 Cloud Run(或 Cloud Functions)代码中,用于将下一个任务加入队列。根据需要调整 days_increment 和计划执行的时间。
import json
from datetime import datetime, timedelta, timezone
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
def cloud_task_scheduler(days_increment: int = 1):
try:
client = tasks_v2.CloudTasksClient()
parent = client.queue_path(GCP_PROJECT_ID, LOCATION, CLOUD_TASK_QUEUE_ID)
FUNCTION_URL = ""
# Payload for the next invocation
payload = {"schedule_next_cloud_task": True}
body_bytes = json.dumps(payload).encode("utf-8")
# Schedule for the desired future time (e.g., 16:00 UTC)
scheduled_date = (
datetime.now(timezone.utc) + timedelta(days=days_increment)
).replace(hour=16, minute=0, second=0, microsecond=0)
ts = timestamp_pb2.Timestamp()
ts.FromDatetime(scheduled_date)
task = {
"http_request": {
"http_method": tasks_v2.HttpMethod.POST,
"url": FUNCTION_URL,
"headers": {"Content-Type": "application/json"},
"body": body_bytes,
},
"schedule_time": ts,
}
client.create_task(parent=parent, task=task)
except Exception as e:
raise Exception(f"Error scheduling next Cloud Task: {e}")
在函数结束时(或合适的位置)调用 cloud_task_scheduler(),以确保下次执行已被加入队列。
安排第一个任务
通过 gcloud CLI 创建初始任务,启动任务链。
gcloud tasks create-http-task my-first-task \
--queue=my-queue \
--url="my-function-url" \
--method=POST \
--body-content='{"schedule_next_cloud_task": true}' \
--header="Content-Type":"application/json" \
--oidc-service-account-email="my-sa-email" \
--location="region" \
--schedule-time="YYYY-MM-DDTHH:MM:SS+00:00"
运行该命令后,你应该会看到第一个任务已在队列中等待,准备调用你的函数。函数执行后,它会将下一个任务加入队列,从而形成自我维持的调度。
为什么这样有效
- 成本效益高 – Cloud Tasks 在慷慨的免费额度内基本免费。您只需为函数的计算时间付费。
- 可伸缩至零 – 在等待下一次计划执行期间,不会消耗任何资源。
- 灵活 – 可以调整间隔、时间,或通过 HTTP 正文传递任意数据,以处理更复杂的工作流(例如,拆分长时间运行的任务)。
这种模式提供了一种简单、无服务器的方式来运行周期性任务,而无需管理外部 cron 服务。