第12天:UDF vs Pandas UDF

发布: (2025年12月12日 GMT+8 03:44)
3 min read
原文: Dev.to

Source: Dev.to

欢迎来到 Spark Mastery 系列的第 12 天!

UDF(用户自定义函数)会显著拖慢 Spark 作业——仅添加一个 UDF,运行时间就可能增加至 10 倍。了解其原因并避免这种陷阱至关重要。

UDF(用户自定义函数)

UDF 是应用于 Spark DataFrame 的 Python 函数。

from pyspark.sql.functions import udf

@udf("string")
def reverse_name(name):
    return name[::-1]

使用普通 UDF 时,Spark 必须:

  1. 将每条记录发送到 Python
  2. 执行 Python 代码
  3. 将结果转换回 JVM
  4. 将结果合并回 DataFrame

每条记录都要跨越 Python ↔ JVM 边界,这非常慢。

内置函数 — 始终首选

Spark 的原生函数是用 Scala 实现的,具备向量化并经过 Catalyst 优化。它们还支持谓词下推和列裁剪。

df.withColumn("upper_name", upper(col("name")))

**规则:**如果 Spark 已提供内置函数,绝不要自行编写 UDF。

Pandas UDF — 正规 UDF 的最佳替代方案

普通 UDF 逐行在 Python 中处理。
Pandas UDF 使用 Apache Arrow 对整批数据(向量化)进行操作,带来巨大的速度提升。

from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def multiply_by_two(col):
    return col * 2

Spark 以批次而非逐行的方式发送数据,从而实现显著的性能提升。

Pandas UDF 的类型

标量 Pandas UDF

@pandas_udf("double")
def add_one(col):
    return col + 1

分组映射 UDF

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def my_grouped_map(pdf):
    # 对每个分组的 pandas DataFrame 进行自定义转换
    return pdf

典型使用场景:

  • 时间序列转换
  • 按用户模型训练
  • 按分组清洗

分组聚合 UDF

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def my_grouped_agg(pdf):
    # 为每个分组返回单个聚合值
    return pdf.mean()

**适用场景:**统计聚合、机器学习指标等。

何时应使用普通 UDF?

仅在以下情况考虑使用普通 UDF:

  • 没有合适的内置函数可用
  • 操作无法向量化
  • 需要大量自定义 Python 逻辑

在典型的 ETL 流程中,这类情况很少见。

实际案例:性能差异

方法运行时间
普通 UDF50 秒
Pandas UDF8 秒
内置函数1 秒

如此鲜明的对比说明了高级工程师为何除非万不得已才会使用普通 UDF。

总结指南

  • 尽可能使用内置函数。
  • 对可向量化的自定义逻辑使用 Pandas UDF
  • 普通 UDF 仅保留给真正例外的情况。

遵循这些实践,你的 Spark 作业性能将会大幅提升。

Back to Blog

相关文章

阅读更多 »