第6部分:Silver Layer – 清洗、丰富和维度

发布: (2026年1月2日 GMT+8 18:50)
2 min read
原文: Dev.to

Source: Dev.to

Part 6:银层 – 清洗、丰富和维度

银层通过以下方式将原始事件转换为可用于分析的记录:

  • 清理错误数据
  • 强制执行模式
  • 添加业务上下文
  • 应用维度建模

价值就在此产生。

数据清洗与类型强制

  • Bronze 层必须保持原样。
  • Silver 层确保数据正确性,并将错误与摄取过程隔离。
silver_stream = (
    spark.readStream
        .format("delta")
        .table("nyc_taxi.bronze.taxi_trips")
        .withColumn(
            "tpep_pickup_datetime",
            to_timestamp("tpep_pickup_datetime")
        )
        .withColumn(
            "fare_amount",
            col("fare_amount").cast("double")
        )
        .filter(col("fare_amount") > 0)
)

使用广播连接

为了实现所需的维度建模,需要与 zip_dim(一个相对较小的表)进行连接。将大表在执行器之间进行 shuffle 成本高昂,因此使用广播连接来提升性能。

未使用广播连接

Without Broadcast join

使用广播连接

With Broadcast join

添加水印

实时数据必须及时处理,但也需要处理迟到的数据。水印告诉 Spark 在最终确定结果之前,最多等待 30 分钟的迟到数据。

from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast

bronze_stream = spark.readStream.table("nyc_taxi.bronze.taxi_trips")
zip_dim = spark.read.table("nyc_taxi.raw.zip_dim")

silver_df = (
    bronze_stream
        .withColumn(
            "pickup_zip",
            regexp_replace("pickup_zip", "\.0$", "").cast("int")
        )
        .withColumn(
            "tpep_pickup_datetime",
            to_timestamp("tpep_pickup_datetime")
        )
        .withColumn(
            "tpep_dropoff_datetime",
            to_timestamp("tpep_dropoff_datetime")
        )
        .withWatermark("tpep_pickup_datetime", "30 minutes")
        .join(
            broadcast(zip_dim),
            bronze_stream.pickup_zip == zip_dim.zip_code,
            "left"
        )
        .select(
            "tpep_pickup_datetime",
            "tpep_dropoff_datetime",
            "trip_distance",
            "fare_amount",
            "pickup_zip",
            "region",
            "state"
        )
)

# 将流写入 Silver Delta 表
(
    silver_df.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/silver")
        .trigger(availableNow=True)
        .toTable("nyc_taxi.silver.taxi_trips_enriched")
)

所需的清洗和标准化已经完成,数据现在已准备好进行进一步成熟和业务洞察生成。

祝学习愉快!

Back to Blog

相关文章

阅读更多 »