第7部分:Gold Layer – Metrics、Watermarks 和 Aggregations
发布: (2026年1月2日 GMT+8 18:51)
2 min read
原文: Dev.to
Source: Dev.to

金表直接回答业务问题
示例
- 按地区统计每小时行程数
- 按邮编统计收入
- 按时间窗口计算平均距离
金表具备:
- 已聚合
- 已优化
- 可直接用于仪表盘
引入事件时间与 Watermark
为了让金层能够处理延迟数据,我们添加 Watermark 并使用窗口来基于事件时间正确关闭聚合。在下面的示例中,Spark 会在打开的 1 小时窗口中等待30 分钟,即从收到的最新事件起算 30 分钟。当 Watermark 阈值(max event tpep_pickup_datetime 收到的时间 – 30 分钟)超过窗口结束时间时,窗口关闭,聚合结果最终确定。
from pyspark.sql.functions import *
silver_df = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")
gold_df = (
silver_df
.withWatermark("tpep_pickup_datetime", "30 minutes")
.groupBy(
window("tpep_pickup_datetime", "1 hour"),
"region"
)
.agg(
count("*").alias("trip_count"),
sum("fare_amount").alias("total_fare"),
avg("trip_distance").alias("avg_distance")
)
)
# Stream the result to a gold Delta table
(
gold_df.writeStream
.option('mergeSchema', 'true')
.trigger(availableNow=True)
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_metrics")
.outputMode("append")
.toTable("nyc_taxi.gold.taxi_metrics")
)
详细的出租车行程指标视图
金层可以暴露多个视图。下面的示例创建了一个突出显示详细出租车行程指标的视图。
from pyspark.sql.functions import *
silver_stream = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")
gold_stream = (
silver_stream
.withWatermark("tpep_pickup_datetime", "30 minutes")
.withColumn("trip_date", to_date("tpep_pickup_datetime"))
.withColumn("trip_hour", hour("tpep_pickup_datetime"))
.groupBy(
window("tpep_pickup_datetime", "1 hour"),
"trip_date",
"trip_hour",
"pickup_zip",
"region"
)
.agg(
count("*").alias("total_trips"),
sum("fare_amount").alias("total_revenue"),
avg("fare_amount").alias("avg_fare"),
avg("trip_distance").alias("avg_distance")
)
)
gold_stream.writeStream \
.format("delta") \
.trigger(availableNow=True) \
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_trip_metrics") \
.outputMode("append") \
.table("nyc_taxi.gold.taxi_trip_metrics")
数据现在已经聚合并存放在金层 Delta 表中,随时可用于业务洞察。
祝学习愉快!