Part 7: Gold Layer – Metrics, Watermarks, and Aggregations
Source: Dev.to

Gold tables answer business questions directly
Examples
- Trips per hour by region
- Revenue per ZIP
- Average distance by time window
Gold tables are:
- Aggregated
- Optimized
- Dashboard‑ready
Introducing Event Time & Watermarking
For the gold layer to handle late data we add a watermark and use windowing to properly close aggregations based on event time. In the example below Spark waits for 30 minutes from the latest event received on an open 1‑hour window. The window is closed and the aggregated results are finalized when the watermark threshold (max event tpep_pickup_datetime received – 30 minutes) exceeds the window’s end time.
from pyspark.sql.functions import *
silver_df = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")
gold_df = (
silver_df
.withWatermark("tpep_pickup_datetime", "30 minutes")
.groupBy(
window("tpep_pickup_datetime", "1 hour"),
"region"
)
.agg(
count("*").alias("trip_count"),
sum("fare_amount").alias("total_fare"),
avg("trip_distance").alias("avg_distance")
)
)
# Stream the result to a gold Delta table
(
gold_df.writeStream
.option('mergeSchema', 'true')
.trigger(availableNow=True)
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_metrics")
.outputMode("append")
.toTable("nyc_taxi.gold.taxi_metrics")
)
Detailed Taxi‑Trip Metrics View
The gold layer can expose multiple views. The following example creates a view that highlights detailed taxi‑trip metrics.
from pyspark.sql.functions import *
silver_stream = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")
gold_stream = (
silver_stream
.withWatermark("tpep_pickup_datetime", "30 minutes")
.withColumn("trip_date", to_date("tpep_pickup_datetime"))
.withColumn("trip_hour", hour("tpep_pickup_datetime"))
.groupBy(
window("tpep_pickup_datetime", "1 hour"),
"trip_date",
"trip_hour",
"pickup_zip",
"region"
)
.agg(
count("*").alias("total_trips"),
sum("fare_amount").alias("total_revenue"),
avg("fare_amount").alias("avg_fare"),
avg("trip_distance").alias("avg_distance")
)
)
gold_stream.writeStream \
.format("delta") \
.trigger(availableNow=True) \
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_trip_metrics") \
.outputMode("append") \
.table("nyc_taxi.gold.taxi_trip_metrics")
The data is now aggregated and available in gold Delta tables, ready for business insights.
Happy learning!