초보자를 위한 PySpark: 기본을 넘어
Source: Towards Data Science
이 시리즈 PySpark for Beginners: Mastering the Basics 를 통해 이제 Spark의 핵심인 분산 데이터, DataFrame, 그리고 지연 실행에 대해 이해하고 있습니다. PySpark를 설치하고, SparkSession을 시작했으며, CSV 파일을 읽고 DataFrame에서 간단한 조작을 수행했습니다. 해당 이야기에 대한 링크는 이 글의 마지막에 남겨두겠습니다.
원문 기사에서 한 번 더 강조하고 싶은 점은, 저는 종종 PySpark와 Spark를 혼용해서 사용하지만, 엄밀히 말하면 Spark는 (Scala로 작성된) 전체 분산 컴퓨팅 프레임워크이고, PySpark는 Spark를 위한 전용 Python API라는 것입니다.
Beyond the basics
이제 초급 단계에서 한 걸음 더 나아가면 흥미로운 일이 벌어집니다. 두 번째 PySpark 프로젝트에서는 약간 다른 사고방식이 필요하다는 것을 곧 깨닫게 됩니다:
- 데이터를 더 안전하고, 빠르며, 예측 가능하게 읽고/쓰고 싶다.
- 조인에 대한 불확실함 없이 데이터셋을 결합하고 싶다.
- Spark가 왜 그런 식으로 동작하는지, 그리고 어떻게 하면 올바른 방향으로 부드럽게 유도할 수 있는지 이해하고 싶다.
이 글은 그런 다음 단계들을 차근차근 안내합니다. 의도적으로 느리게, 실용적으로 접근합니다. 깊은 내부 구조는 다루지 않으며, 클러스터 튜닝이나 복잡한 Spark 최적화도 다루지 않습니다. 장난감 예제에서 작은 실무 프로젝트로 넘어갈 때 초보자가 꼭 알아야 할 내용만을 다룹니다.
우리는 앞서와 마찬가지로 오픈소스 Spark를 로컬에서 실행합니다.
1. 한 단계 더 나아가기: 데이터를 올바르게 읽기
첫 번째 글에서는 가장 간단한 CSV 로더를 사용했습니다:
df = spark.read.csv("sales.csv", header=True, inferSchema=True)
동작은 하며 초기 실험에는 괜찮지만, 미묘한 문제를 숨기고 있습니다.
Spark가 데이터 타입을 추측하고 있다
inferSchema=True 옵션을 사용하면 Spark는 파일의 작은 샘플을 살펴보고 각 컬럼이 정수, 문자열, 불리언, 혹은 실수인지 추측합니다. 즉:
- 99개의 행이 숫자로 보이고 100번째 행이 비어 있다면, Spark는 해당 컬럼을 문자열로 해석할 수 있습니다.
- 누군가가 다음 주에 파일을 편집하면서
23.50대신£23.50을 넣으면, Spark는 전체 컬럼을 다르게 처리할 수 있습니다. - 파일이 크면, Spark가 사용하는 샘플이 전체 데이터를 대표하지 못할 수 있습니다.
이러한 상황은 나중에 신비한 동작을 초래하고, 초보자들이 가장 진단하기 어려운 버그를 만들게 됩니다.
초보자를 위한 더 나은 습관: 스키마 정의하기
스키마는 데이터를 읽기 위한 Spark의 설계도와 같습니다. 무언가를 만들기 전에, Spark에게 다음과 같은 정보를 알려줍니다:
- 컬럼 이름
- 각 컬럼이 가져야 할 데이터 타입
- 컬럼 값이 선택적인지 여부
아래는 우리 sales 데이터 예시의 스키마 정의 모습입니다. 데이터는 다음과 같았던 것을 기억하세요:
transaction_id,customer_name,net_amount,tax_amount, is_member
101,Alice,250.50,25.05,true
102,Bob,120.00,6.00, false
103,Charlie,450.75,25.07,true
104,David,89.99,5.73,false
위 필드들의 타입을 Spark에 지정하려면 다음과 같이 코드를 작성합니다.
from pyspark.sql import types as T
schema = T.StructType([
T.StructField("transaction_id", T.IntegerType(), False),
T.StructField("customer_name", T.StringType(), False),
T.StructField("net_amount", T.DoubleType(), True),
T.StructField("tax_amount", T.DoubleType(), True),
T.StructField("is_member", T.BooleanType(), True),
])
컬럼 이름과 타입 파라미터는 직관적입니다. True/False 파라미터는 해당 컬럼에 NULL 값이 허용되는지 여부를 나타냅니다. 이 nullable 플래그는 주로 스키마 메타데이터와 최적화 정보를 위한 것이며, 데이터베이스의 NOT NULL 제약처럼 모든 데이터 소스에 엄격히 적용되지는 않습니다.
CSV 데이터를 읽을 때 유용한 옵션들
스키마와 함께 사용할 수 있는 편리한 CSV 읽기 옵션이 여러 가지 있습니다. 흔히 쓰이는 옵션은 다음과 같습니다:
mode="PERMISSIVE": 가능한 한 많은 잘못된 행을 유지mode="DROPMALFORMED": 형식이 맞지 않는 행을 삭제mode="FAILFAST": 오류가 발생하면 즉시 중단header=True/False: 파일에 헤더 레코드가 있는지 여부nullValue: 입력에서null로 간주할 텍스트 지정dateFormat/timestampFormat
이제 아래와 같이 sales_data.csv 를 DataFrame으로 로드할 수 있습니다:
df = (
spark.read
.option("header", True)
# 다른 모드: "PERMISSIVE", "DROPMALFORMED"
.option("mode", "FAILFAST")
.option("nullValue", "N/A")
.schema(schema)
.csv("sales_data.csv")
)
초보자에게 왜 중요한가?
- 작업을 시작하기 전에 데이터 타입을 정확히 알 수 있습니다.
- 스키마가 지정돼 있으면 Spark가 이상한 행을 거부하고, 조용히 해석하지 않습니다.
- 변환 과정이 예측 가능해집니다.
- 나중에 두 데이터셋을 조인할 때 타입 불일치로 인한 놀라움을 방지합니다.
2. 데이터 변환 이해하기
이전 글에서 PySpark로 DataFrame을 다루는 첫 단계에서, 아래와 같이 파생 컬럼을 추가한 적이 있었습니다:
df2 = df.withColumn("gross_amount", df.net_amount + df.tax_amount)
이 라인은 아직 실제 계산을 수행하지 않습니다. Spark 내부 계획에 단계를 추가하는 것뿐입니다:
1. CSV 읽기
2. 새로운 컬럼 추가 (gross_amount = net + tax)
그 뒤에 또 다른 단계들을 추가할 수 있습니다:
df3 = df2.withColumn("tax_percentage", df2.tax_amount / df2.gross_amount * 100)
하지만 아직 계산은 일어나지 않았습니다. 액션을 수행할 때 비로소 Spark가 모든 단계를 실행합니다:
df3.show()
Spark는 다음과 같이 말합니다:
“좋아, 이제 실제로 이 모든 단계를 실행해야겠어.”
이것이 지연 실행(lazy execution) 이라는 개념이며, 초보자에게 중요한 점은 이름이 아니라 효과입니다. 즉,
- 결과가 필요할 때까지 여러 변환을 연쇄해도 비용이 발생하지 않는다.
- Spark는 내부적으로 순서를 재배열해 효율적으로 실행한다.
- 나중에 필터링해서 버릴 데이터에 대해 중간 단계에 시간을 낭비하지 않는다.
일상적인 작업에 비유하면 다음과 같습니다:
- 모든 재료를 모은다.
- 머릿속으로 조합을 구상한다.
- 실제로 만들게 될 것이 확정되면 비로소 재료를 손질하고 준비한다.
3. 문제를 일으키기 전에 데이터 정리하기
실제 데이터는 보통 지저분하고, 결측값, 빈 문자열, 중복 레코드, 혹은 “N/A”, “unknown” 같은 플레이스홀더 값을 포함합니다.
PySpark에서는 명확한 문제를 초기에 잡아내 나머지 워크플로가 예측 가능하도록 하는 것이 목표입니다. 이를 위해 PySpark는 여러 유용한 함수를 제공합니다.
결측값이 있는 행 삭제하기
가장 간단한 정리 함수는 dropna() 입니다.
df_clean = df.dropna()
이 코드는 어느 컬럼에든 null 값이 있는 행을 모두 제거합니다. 경우에 따라 너무 과격할 수 있습니다.
보통은 중요한 컬럼에 결측값이 있는 행만 삭제합니다:
df_clean = df.dropna(subset=["net_amount", "tax_amount"])
즉, net_amount와 tax_amount가 모두 존재하는 행만 유지하고, 다른 컬럼에 null 이 있더라도 허용합니다.
결측값 채우기
때로는 행을 삭제하고 싶지 않을 때가 있습니다. 이때는 결측값을 의미 있는 값으로 대체합니다. fillna() 가 여기서 유용합니다.
df_clean = df.fillna({"city": "Unknown"})
숫자 컬럼도 채울 수 있습니다:
df_clean = df.fillna({"tax_amount": 0.0})
예를 들어, 할인 금액이 누락된 경우 0.0 으로 채우는 것이 합리적일 수 있습니다. 하지만 잘못된 기본값을 선택하면 데이터 의미가 바뀔 수 있으니 주의하세요.
cast() 로 컬럼 타입 변경하기
CSV 파일을 읽을 때 Spark가 컬럼을 잘못된 타입으로 인식하는 경우가 있습니다. 이때는 cast() 연산자를 사용해 컬럼 타입을 변환합니다: