Self-Scheduling Recurring Cloud Tasks (with Terraform + Python code)
Source: Dev.to

Overview
Using a fully serverless cloud set‑up, sometimes the easy question “how can I run this function on a recurring basis?” isn’t so easy to answer. A traditional CRON job won’t work here.
Instead, you can achieve recurring execution by utilising scheduled Cloud Tasks that reschedule themselves.
The workflow
- Set up a Cloud Tasks queue.
- Add code to the Cloud Run function that enqueues the next task.
- Create the initial task to start the chain.
Service Account
Create the service account that will run the Cloud Function.
resource "google_service_account" "" {
account_id = ""
display_name = "My Service Account"
description = "Service account used for my function"
project = var.project_id
}
Grant the required roles for Cloud Tasks and invoking Cloud Run.
resource "google_project_iam_member" "" {
for_each = toset([
"roles/run.invoker",
"roles/cloudtasks.enqueuer"
])
project = var.project_id
role = each.value
member = "serviceAccount:${google_service_account..email}"
depends_on = [google_service_account.]
}
Allow the service account to impersonate itself when queuing the next task.
resource "google_service_account_iam_member" "" {
service_account_id = google_service_account..name
role = "roles/iam.serviceAccountUser"
member = "serviceAccount:${google_service_account..email}"
}
Cloud Task Queue
Create the Cloud Tasks queue that will hold the scheduled HTTP tasks.
resource "google_cloud_tasks_queue" "" {
name = ""
location = var.region
http_target {
http_method = "POST"
oidc_token {
service_account_email =
}
}
}
Queue Task from Function
Add a helper function to your Cloud Run (or Cloud Functions) code that enqueues the next task. Adjust days_increment and the scheduled time as needed.
import json
from datetime import datetime, timedelta, timezone
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
def cloud_task_scheduler(days_increment: int = 1):
try:
client = tasks_v2.CloudTasksClient()
parent = client.queue_path(GCP_PROJECT_ID, LOCATION, CLOUD_TASK_QUEUE_ID)
FUNCTION_URL = ""
# Payload for the next invocation
payload = {"schedule_next_cloud_task": True}
body_bytes = json.dumps(payload).encode("utf-8")
# Schedule for the desired future time (e.g., 16:00 UTC)
scheduled_date = (
datetime.now(timezone.utc) + timedelta(days=days_increment)
).replace(hour=16, minute=0, second=0, microsecond=0)
ts = timestamp_pb2.Timestamp()
ts.FromDatetime(scheduled_date)
task = {
"http_request": {
"http_method": tasks_v2.HttpMethod.POST,
"url": FUNCTION_URL,
"headers": {"Content-Type": "application/json"},
"body": body_bytes,
},
"schedule_time": ts,
}
client.create_task(parent=parent, task=task)
except Exception as e:
raise Exception(f"Error scheduling next Cloud Task: {e}")
Call cloud_task_scheduler() at the end of your function (or wherever appropriate) to ensure the next execution is queued.
Schedule the First Task
Kick‑off the chain by creating the initial task via the gcloud CLI.
gcloud tasks create-http-task my-first-task \
--queue=my-queue \
--url="my-function-url" \
--method=POST \
--body-content='{"schedule_next_cloud_task": true}' \
--header="Content-Type":"application/json" \
--oidc-service-account-email="my-sa-email" \
--location="region" \
--schedule-time="YYYY-MM-DDTHH:MM:SS+00:00"
After running the command, you should see the first task sitting in the queue, ready to invoke your function. Once the function runs, it will enqueue the next task, creating a self‑sustaining schedule.
Why This Works
- Cost‑effective – Cloud Tasks are essentially free within the generous free tier. You only pay for the compute time of your function.
- Scalable to zero – While waiting for the next scheduled execution, no resources are consumed.
- Flexible – Adjust the interval, time of day, or pass arbitrary data via the HTTP body to handle more complex workflows (e.g., splitting long‑running jobs).
This pattern provides a simple, serverless way to run recurring jobs without managing external cron services.