自调度的周期性云任务(使用 Terraform + Python 代码)

发布: (2026年1月12日 GMT+8 17:48)
4 分钟阅读
原文: Dev.to

Source: Dev.to

Self‑Scheduling Recurring Cloud Tasks(使用 Terraform + Python 代码)的封面图

概览

使用完全无服务器的云部署时,虽然“如何定期运行此函数?”看似简单,但答案并不容易。传统的 CRON 任务在这里行不通。
相反,你可以通过利用 计划的 Cloud Tasks(自行重新调度)来实现周期性执行。

工作流程

  1. 设置一个 Cloud Tasks 队列。
  2. 在 Cloud Run 函数中添加代码,以将下一个任务加入队列。
  3. 创建初始任务以启动整个链条。

服务账号

创建将在 Cloud Function 中运行的服务账号。

resource "google_service_account" "" {
  account_id    = ""
  display_name = "My Service Account"
  description   = "Service account used for my function"
  project       = var.project_id
}

授予 Cloud Tasks 所需的角色以及调用 Cloud Run 的权限。

resource "google_project_iam_member" "" {
  for_each = toset([
    "roles/run.invoker",
    "roles/cloudtasks.enqueuer"
  ])

  project = var.project_id
  role    = each.value
  member  = "serviceAccount:${google_service_account..email}"

  depends_on = [google_service_account.]
}

允许服务账号在排队下一个任务时冒充自身。

resource "google_service_account_iam_member" "" {
  service_account_id = google_service_account..name
  role               = "roles/iam.serviceAccountUser"
  member             = "serviceAccount:${google_service_account..email"
}

云任务队列

创建用于保存计划 HTTP 任务的 Cloud Tasks 队列。

resource "google_cloud_tasks_queue" "" {
  name     = ""
  location = var.region

  http_target {
    http_method = "POST"
    oidc_token {
      service_account_email = 
    }
  }
}

从函数排队任务

添加一个辅助函数到你的 Cloud Run(或 Cloud Functions)代码中,用于将下一个任务加入队列。根据需要调整 days_increment 和计划执行的时间。

import json
from datetime import datetime, timedelta, timezone
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2

def cloud_task_scheduler(days_increment: int = 1):
    try:
        client = tasks_v2.CloudTasksClient()
        parent = client.queue_path(GCP_PROJECT_ID, LOCATION, CLOUD_TASK_QUEUE_ID)
        FUNCTION_URL = ""

        # Payload for the next invocation
        payload = {"schedule_next_cloud_task": True}
        body_bytes = json.dumps(payload).encode("utf-8")

        # Schedule for the desired future time (e.g., 16:00 UTC)
        scheduled_date = (
            datetime.now(timezone.utc) + timedelta(days=days_increment)
        ).replace(hour=16, minute=0, second=0, microsecond=0)

        ts = timestamp_pb2.Timestamp()
        ts.FromDatetime(scheduled_date)

        task = {
            "http_request": {
                "http_method": tasks_v2.HttpMethod.POST,
                "url": FUNCTION_URL,
                "headers": {"Content-Type": "application/json"},
                "body": body_bytes,
            },
            "schedule_time": ts,
        }

        client.create_task(parent=parent, task=task)

    except Exception as e:
        raise Exception(f"Error scheduling next Cloud Task: {e}")

在函数结束时(或合适的位置)调用 cloud_task_scheduler(),以确保下次执行已被加入队列。

安排第一个任务

通过 gcloud CLI 创建初始任务,启动任务链。

gcloud tasks create-http-task my-first-task \
  --queue=my-queue \
  --url="my-function-url" \
  --method=POST \
  --body-content='{"schedule_next_cloud_task": true}' \
  --header="Content-Type":"application/json" \
  --oidc-service-account-email="my-sa-email" \
  --location="region" \
  --schedule-time="YYYY-MM-DDTHH:MM:SS+00:00"

运行该命令后,你应该会看到第一个任务已在队列中等待,准备调用你的函数。函数执行后,它会将下一个任务加入队列,从而形成自我维持的调度。

为什么这样有效

  • 成本效益高 – Cloud Tasks 在慷慨的免费额度内基本免费。您只需为函数的计算时间付费。
  • 可伸缩至零 – 在等待下一次计划执行期间,不会消耗任何资源。
  • 灵活 – 可以调整间隔、时间,或通过 HTTP 正文传递任意数据,以处理更复杂的工作流(例如,拆分长时间运行的任务)。

这种模式提供了一种简单、无服务器的方式来运行周期性任务,而无需管理外部 cron 服务。

Back to Blog

相关文章

阅读更多 »

你好,我是新人。

嗨!我又回到 STEM 的领域了。我也喜欢学习能源系统、科学、技术、工程和数学。其中一个项目是…