GCP AgentFlow: Building Agentic AI Orchestration on Google Cloud
Source: Dev.to
Overview
Modern cloud applications don’t just move data — they make decisions. A file arrives, a risk score is calculated, and the system needs to route it, quarantine it, or escalate it — all without a human in the loop. That’s the promise of agentic AI, and GCP AgentFlow is the toolkit built to make it practical on Google Cloud.
Traditional event‑driven architectures are reactive: an event arrives, a function runs, data moves. Agentic architectures go a step further — they evaluate context, apply decision logic, and determine what should happen next based on the current state of the system.
On Google Cloud this typically involves:
- Pub/Sub – event ingestion and routing
- Cloud Workflows – multi‑step orchestration
- Datastore – tracking operational state across events
- BigQuery – logging decisions and analytics
- ML models – scoring, classification, recommendation
GCP AgentFlow provides the connective tissue across all of these components without imposing a rigid architecture.
Library Design
The library is intentionally lightweight and composable. Each component can be used independently or wired together into a full agentic pipeline.
Decision Engine
from gcp_agentflow import AgentDecisionInput, decide_next_action
event = AgentDecisionInput(
event_type="file_arrived",
source="pubsub",
risk_score=72,
payload={"bucket": "incoming", "name": "file.csv"}
)
decision = decide_next_action(event)
print(decision.action) # e.g., "quarantine"
print(decision.reason) # e.g., "Risk score exceeds threshold of 70"- The
AgentDecisionInputschema is flexible — you provide the risk score from your ML model, and the decision engine applies your routing rules. - This separation keeps ML logic and orchestration logic cleanly decoupled.
Pub/Sub Wrapper
A safe, retry‑aware wrapper around the Pub/Sub publish API. It handles JSON serialization, message attribute injection, and error logging, keeping event‑emission code clean and consistent.
BigQuery Logger
Every decision the agent makes is logged as a structured analytics event in BigQuery, providing a complete audit trail and enabling downstream analysis of automation behavior.
Datastore State Manager
Saves and retrieves operational state by entity key. This makes the agent stateful — it can check whether a file has been seen before, whether a workflow step was already completed, or whether a retry budget has been exhausted.
Composing a Real Agentic Pipeline
- Ingestion – A file lands in a Cloud Storage bucket and triggers a Pub/Sub event.
- Processing – A Cloud Run service receives the event, calls
decide_next_actionwith a risk score from your ML model. - Decision – The engine returns
quarantinebecause the risk score exceeds the threshold. - State Update – The agent updates Datastore to mark the file as quarantined with a timestamp.
- Logging – The BigQuery logger records the full decision context for audit and analytics.
- Routing – A Pub/Sub message is published to a quarantine topic, triggering a downstream review workflow in Google Workflows.
The flow is observable, auditable, and re‑runnable — if any step fails, Datastore state prevents duplicate processing on retry.
CLI Simulation
The CLI lets you simulate agent decisions without spinning up infrastructure:
gcp-agentflow decide --event-type file_arrived --risk-score 72Useful in CI pipelines to validate routing rules before deploying to production.
Use Cases
- Compliance‑driven file processing – Route incoming files through validation, virus scanning, and approval gates based on ML risk scores.
- Multi‑step data ingestion pipelines – Decide at each stage whether to proceed, retry with backoff, or dead‑letter a record, with full state tracking in Datastore.
- Fraud and anomaly detection – Score transactions in real time and trigger escalation workflows when confidence thresholds are crossed.
- GDPR and regulatory data routing – Automatically classify and route records, logging every decision for audit trails in BigQuery.
- AI agent backends – Use GCP AgentFlow as the decision and state layer behind Vertex AI Agent Builder agents, giving AI agents persistent memory via Datastore and structured action logging via BigQuery.
Installation
pip install gcp-agentflowThe library runs cleanly in Cloud Run containers, Dataflow workers, Cloud Functions, or local environments. It has no heavy dependencies — only Google Cloud client libraries and standard Python.
Publishing the Package
python -m pip install --upgrade build twine
python -m build
twine check dist/*
twine upload dist/*When to Use GCP AgentFlow
GCP AgentFlow is not a full workflow engine (that’s Google Workflows) nor an ML platform (that’s Vertex AI). It fills the gap between those systems: the decision logic, state management, and event wiring that every agentic pipeline needs but nobody wants to rewrite from scratch.
If you’re building event‑driven automation on Google Cloud and find yourself writing the same routing logic, state‑check boilerplate, and BigQuery logging code across multiple services, GCP AgentFlow provides the abstraction layer you’ve been missing.
License
MIT License