Part 7: Gold Layer – Metrics, Watermarks, and Aggregations

Published: (January 2, 2026 at 05:51 AM EST)
2 min read
Source: Dev.to

Source: Dev.to

Cover image for Part 7: Gold Layer – Metrics, Watermarks, and Aggregations

Gold tables answer business questions directly

Examples

  • Trips per hour by region
  • Revenue per ZIP
  • Average distance by time window

Gold tables are:

  • Aggregated
  • Optimized
  • Dashboard‑ready

Introducing Event Time & Watermarking

For the gold layer to handle late data we add a watermark and use windowing to properly close aggregations based on event time. In the example below Spark waits for 30 minutes from the latest event received on an open 1‑hour window. The window is closed and the aggregated results are finalized when the watermark threshold (max event tpep_pickup_datetime received – 30 minutes) exceeds the window’s end time.

from pyspark.sql.functions import *

silver_df = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")

gold_df = (
    silver_df
        .withWatermark("tpep_pickup_datetime", "30 minutes")
        .groupBy(
            window("tpep_pickup_datetime", "1 hour"),
            "region"
        )
        .agg(
            count("*").alias("trip_count"),
            sum("fare_amount").alias("total_fare"),
            avg("trip_distance").alias("avg_distance")
        )
)

# Stream the result to a gold Delta table
(
    gold_df.writeStream
        .option('mergeSchema', 'true')
        .trigger(availableNow=True)
        .option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_metrics")
        .outputMode("append")
        .toTable("nyc_taxi.gold.taxi_metrics")
)

Detailed Taxi‑Trip Metrics View

The gold layer can expose multiple views. The following example creates a view that highlights detailed taxi‑trip metrics.

from pyspark.sql.functions import *

silver_stream = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")

gold_stream = (
    silver_stream
        .withWatermark("tpep_pickup_datetime", "30 minutes")
        .withColumn("trip_date", to_date("tpep_pickup_datetime"))
        .withColumn("trip_hour", hour("tpep_pickup_datetime"))
        .groupBy(
            window("tpep_pickup_datetime", "1 hour"),
            "trip_date",
            "trip_hour",
            "pickup_zip",
            "region"
        )
        .agg(
            count("*").alias("total_trips"),
            sum("fare_amount").alias("total_revenue"),
            avg("fare_amount").alias("avg_fare"),
            avg("trip_distance").alias("avg_distance")
        )
)

gold_stream.writeStream \
    .format("delta") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_trip_metrics") \
    .outputMode("append") \
    .table("nyc_taxi.gold.taxi_trip_metrics")

The data is now aggregated and available in gold Delta tables, ready for business insights.

Happy learning!

Back to Blog

Related posts

Read more »