第7部分:Gold Layer – Metrics、Watermarks 和 Aggregations

发布: (2026年1月2日 GMT+8 18:51)
2 min read
原文: Dev.to

Source: Dev.to

第 7 部分封面图:金层 – 指标、Watermark 与聚合

金表直接回答业务问题

示例

  • 按地区统计每小时行程数
  • 按邮编统计收入
  • 按时间窗口计算平均距离

金表具备:

  • 已聚合
  • 已优化
  • 可直接用于仪表盘

引入事件时间与 Watermark

为了让金层能够处理延迟数据,我们添加 Watermark 并使用窗口来基于事件时间正确关闭聚合。在下面的示例中,Spark 会在打开的 1 小时窗口中等待30 分钟,即从收到的最新事件起算 30 分钟。当 Watermark 阈值(max event tpep_pickup_datetime 收到的时间 – 30 分钟)超过窗口结束时间时,窗口关闭,聚合结果最终确定。

from pyspark.sql.functions import *

silver_df = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")

gold_df = (
    silver_df
        .withWatermark("tpep_pickup_datetime", "30 minutes")
        .groupBy(
            window("tpep_pickup_datetime", "1 hour"),
            "region"
        )
        .agg(
            count("*").alias("trip_count"),
            sum("fare_amount").alias("total_fare"),
            avg("trip_distance").alias("avg_distance")
        )
)

# Stream the result to a gold Delta table
(
    gold_df.writeStream
        .option('mergeSchema', 'true')
        .trigger(availableNow=True)
        .option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_metrics")
        .outputMode("append")
        .toTable("nyc_taxi.gold.taxi_metrics")
)

详细的出租车行程指标视图

金层可以暴露多个视图。下面的示例创建了一个突出显示详细出租车行程指标的视图。

from pyspark.sql.functions import *

silver_stream = spark.readStream.format("delta").table("nyc_taxi.silver.taxi_trips_enriched")

gold_stream = (
    silver_stream
        .withWatermark("tpep_pickup_datetime", "30 minutes")
        .withColumn("trip_date", to_date("tpep_pickup_datetime"))
        .withColumn("trip_hour", hour("tpep_pickup_datetime"))
        .groupBy(
            window("tpep_pickup_datetime", "1 hour"),
            "trip_date",
            "trip_hour",
            "pickup_zip",
            "region"
        )
        .agg(
            count("*").alias("total_trips"),
            sum("fare_amount").alias("total_revenue"),
            avg("fare_amount").alias("avg_fare"),
            avg("trip_distance").alias("avg_distance")
        )
)

gold_stream.writeStream \
    .format("delta") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/gold/taxi_trip_metrics") \
    .outputMode("append") \
    .table("nyc_taxi.gold.taxi_trip_metrics")

数据现在已经聚合并存放在金层 Delta 表中,随时可用于业务洞察。

祝学习愉快!

Back to Blog

相关文章

阅读更多 »