๐ฅ 7์ผ ์ฐจ: PySpark Joins, Unions, ๊ทธ๋ฆฌ๊ณ GroupBy ๊ฐ์ด๋
Source: Dev.to
PySpark์์์ Join โ ETL ํ์ดํ๋ผ์ธ์ ํต์ฌ
Join์ ๋ DataFrame์ ํค๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ณํฉํ๋ ๊ฒ์ผ๋ก, SQL์ JOIN๊ณผ ์ ์ฌํฉ๋๋ค.
๊ธฐ๋ณธ Join
df.join(df2, df.id == df2.id, "inner")
๊ฐ์ ์ปฌ๋ผ ์ด๋ฆ์ผ๋ก Join
df.join(df2, ["id"], "left")
Spark์์ ์ง์ํ๋ Join ์ ํ
innerleft/left_outerright/right_outerfull/full_outerleft_semiโ ์กด์ฌ ์ฌ๋ถ ํ์ธ (์ค๋ฅธ์ชฝ์ ๋งค์น๊ฐ ์๋ ์ผ์ชฝ DataFrame์ ํ์ ๋ฐํ)left_antiโ ์ํฐโ์กฐ์ธ (์ค๋ฅธ์ชฝ์ ๋งค์น๊ฐ ์๋ ์ผ์ชฝ DataFrame์ ํ์ ๋ฐํ)
Union โ DataFrame์ ์์ง์ผ๋ก ์๊ธฐ
๋์ผ ์คํค๋ง, ๋์ผ ์์์ธ ๊ฒฝ์ฐ Union
df.union(df2)
์ปฌ๋ผ ์ด๋ฆ์ผ๋ก Union (์์๊ฐ ๋ค๋ฅธ ๊ฒฝ์ฐ)
df.unionByName(df2)
Union์ ์๋ณ ํ์ผ, ์ผ์ผ ์์ง ๋ฐ์ดํฐ์ , ํน์ ํํฐ์ ๋ ๋ฐ์ดํฐ๋ฅผ ๊ฒฐํฉํ ๋ ์ฌ์ฉํฉ๋๋ค.
GroupBy + Aggregation โ ๋น์ฆ๋์ค ๋ก์ง ๋ ์ด์ด
์์
df.groupBy("dept").agg(
sum("salary").alias("total_salary"),
avg("age").alias("avg_age")
)
count์ countDistinct
df.select(count("id"))
df.select(countDistinct("id"))
Approximate Distinct Count (๋น ๋ฅธ ๋ฐฉ๋ฒ)
df.select(approx_count_distinct("id"))
์ค์ ETL ์์ โ ๋งค์ถ ์ง๊ณ
Sales์ Products ์กฐ์ธ
df_joined = sales.join(products, "product_id", "left")
์นดํ ๊ณ ๋ฆฌ๋ณ ๋งค์ถ ์ง๊ณ
df_agg = df_joined.groupBy("category").agg(
sum("amount").alias("total_revenue"),
count("*").alias("transactions")
)
์ด ํจํด์ด ๋ฐ๋ก ๋น์ฆ๋์ค ๋์๋ณด๋๊ฐ ๊ตฌ์ถ๋๋ ๋ฐฉ์์ ๋๋ค.
Join ์ฑ๋ฅ ์ต์ ํ
์์ ์กฐํ ํ ์ด๋ธ์ ๋ํ Broadcast Join
df.join(broadcast(df_small), "id")
Broadcast Join์ ์ ํ์ ํผํ๋ฏ๋ก ์์ ์ด ํจ์ฌ ๋น ๋ฅด๊ฒ ์ํ๋ฉ๋๋ค.
์์ฝ
- Join (
left_semi์ฒดํฌ ํฌํจ) - Union /
unionByName groupBy์ ์ง๊ณ ํจ์๋คcount,countDistinct,approx_count_distinct- Broadcast Join ์ต์ ํ
๋น ์ง ๋ด์ฉ์ด ์์ผ๋ฉด ๋๊ธ๋ก ์๋ ค ์ฃผ์ธ์. ๊ฐ์ฌํฉ๋๋ค!