데이터 오케스트레이션을 위한 Dagster 사용해 보기

발행: (2025년 12월 2일 오전 08:33 GMT+9)
7 min read
원문: Dev.to

Source: Dev.to

Intro

파이프라인에 바로 들어가기 전에 간단한 소개와 Dagster가 가지고 있는 가장 눈에 띄는 차별점인 Assets를 강조하고 싶습니다. Dagster에서는 파이프라인을 흐름, 연산자, DAG가 아니라 실제 데이터 엔터티(자산)를 중심으로 모델링합니다. 데이터를 추출하고 S3에 로드하는 절차적 단계는 별도의 작업이 아니라 하나의 asset으로 모델링됩니다. 이는 Dagster를 선언형에 가깝게 만들며, Prefect와 Airflow의 작업 기반, 절차적 접근 방식과는 대조됩니다.

Dagster는 op을 통해 비자산 방식도 지원하지만, 복잡성이 op을 요구하지 않는 한 자산을 사용하는 것이 권장됩니다. 이 부분은 나중에 다시 다루겠습니다.

Data Pipeline Architecture

weather_etl 프로젝트에는 두 개의 데이터셋—actual weatherforecast—이 있으며, 둘 다 Open‑Meteo API(히스토리 포함)에서 가져와 S3에 Parquet 파일로 저장합니다. 변환 단계에서는 pandas의 빠른 대안인 Polars를 사용합니다. 최종 Dagster 자산 그래프는 다음과 같습니다:

Assets

Assets

Dagster에서 자산을 정의하는 것은 간단합니다: 함수에 @asset 데코레이터를 붙이면 됩니다. 파이프라인은 두 층으로 구성됩니다:

  • raw – Open‑Meteo API에서 데이터를 가져와 그대로 S3에 저장하는 자산.
  • staged – 원시 데이터를 정규화하고 분석에 적합한 테이블로 변환하는 자산.

Raw Assets

historical_forecast

월별 파티션을 가진 자산으로, 2023년 1월부터 과거 날씨 예보 데이터를 월 단위로 가져옵니다. 월 파티션을 사용하면 수년치 데이터를 독립적으로 처리할 수 있어 파이프라인이 실패에 강하고 특정 기간만 재처리하기 쉽습니다.

import dagster as dg
from datetime import datetime
from dagster import AssetExecutionContext, RetryPolicy, MetadataValue, AssetIn
from pendulum import UTC

monthly_partition = dg.MonthlyPartitionsDefinition(
    # Backfill the data from Jan 2023 onwards
    start_date=datetime(2023, 1, 1, tzinfo=UTC),
)

@dg.asset(
    key_prefix=["raw"],
    retry_policy=RetryPolicy(max_retries=3, delay=10),
    group_name="weather_etl",
    metadata={
        "owner": "ybryz",
        "api-url": MetadataValue.url(HISTORICAL_API_URL),
        "endpoint": "/forecast",
    },
    kinds={"s3"},
    partitions_def=monthly_partition,
)
def historical_forecast(
    context: AssetExecutionContext,
    weather_api_client: WeatherApiClient,
    s3: S3Resource,
):
    ...

hourly_forecast

UTC 06:00에 매일 실행되며, 지정된 도시(NYC, Philadelphia, Chicago, DC)의 7일 예보를 가져옵니다. AutomationCondition.on_cron()을 사용해 매일 아침 최신 데이터를 확보합니다.

@dg.asset(
    key_prefix=["raw"],
    retry_policy=RetryPolicy(max_retries=3, delay=10),
    group_name="weather_etl",
    metadata={
        "owner": "ybryz",
        "api-url": MetadataValue.url(API_URL),
        "endpoint": "/forecast",
    },
    kinds={"s3"},
    automation_condition=dg.AutomationCondition.on_cron("0 6 * * *"),  # Daily at 6 AM UTC
)
def hourly_forecast(
    weather_api_client: WeatherApiClient,
    s3: S3Resource,
):
    ...

historical_actual_weather

historical_forecast와 유사하지만 관측된 날씨 데이터를 Archive API에서 가져옵니다. 역시 월 파티션을 사용하며, Archive API가 5일 지연을 두고 있기 때문에 5일 전 이상의 데이터만 조회할 수 있습니다.

actual_weather

5일마다 실행되어 최근 관측 데이터(10일 전부터 5일 전까지)를 가져와 API 지연에도 데이터셋을 최신 상태로 유지합니다.

모든 raw 자산은 공통 패턴을 따릅니다: WeatherApiClient를 호출해 데이터를 가져오고, Polars DataFrame으로 변환한 뒤, lineage를 위해 extracted_at 타임스탬프를 추가하고, 월별로 정리된 Parquet 파일 형태로 S3에 업로드합니다. 각 자산은 일시적인 API 오류를 대비해 최대 3회 재시도(10초 간격) 정책을 포함합니다.

Staged Assets

hourly_forecast_table

historical_forecasthourly_forecast에 의존하는 변환 자산입니다. S3에 저장된 모든 원시 예보 Parquet 파일을 읽어 들여, 중첩된 JSON을 시간 배열로 펼치고, 좌표를 도시 코드에 매핑한 뒤, 도시와 타임스탬프 기준으로 중복을 제거합니다. 새 상위 데이터가 도착하면 자동으로 실행되도록 AutomationCondition.eager()를 사용합니다.

@dg.asset(
    key_prefix=["staged"],
    group_name="weather_etl",
    kinds={"polars"},
    deps=[historical_forecast, hourly_forecast],
    automation_condition=dg.AutomationCondition.eager(),
)
def hourly_forecast_table():
    ...

actual_weather_table

관측된 날씨 데이터에 대한 대응 자산입니다. historical_actual_weatheractual_weather에 의존하며, 공유된 transform_weather_data() 함수를 통해 동일한 정규화 로직을 적용합니다.

@dg.asset(
    key_prefix=["staged"],
    group_name="weather_etl",
    kinds={"polars"},
    deps=[historical_actual_weather, actual_weather],
    automation_condition=dg.AutomationCondition.eager(),
)
def actual_weather_table():
    ...

두 staged 자산 모두 테이블 스키마와 행 수와 같은 풍부한 메타데이터를 포함하고 있어 Dagster UI를 통해 데이터 품질을 모니터링하고 이상 징후를 쉽게 파악할 수 있습니다.

Resources and Dependency Injection

Dagster의 리소스 시스템은 의존성 주입을 깔끔하게 처리하여 전역 상태나 흩어진 설정을 피합니다. 이 파이프라인에서는 WeatherApiClient 리소스가 Open‑Meteo API를 감싸고 있습니다:

class WeatherApiClient:
    def __init__(self, base_url: str, api_key: str | None = None):
        self.base_url = base_url
        self.api_key = api_key

    def fetch(self, endpoint: str, params: dict) -> dict:
        # Implementation that performs HTTP GET and returns JSON
        ...

# Register the resource in the Dagster definitions
weather_api_client_resource = dg.ResourceDefinition(
    resource_fn=lambda _: WeatherApiClient(base_url="https://api.open-meteo.com")
)

weather_api_client_resourceS3Resource 같은 리소스는 해당 자산에 주입되어 코드가 모듈화되고 테스트하기 쉬워집니다.


Full pipeline source:

Back to Blog

관련 글

더 보기 »

core.async: 심층 탐구 — 온라인 밋업

이벤트 개요: 12월 10일 GMT+1 기준 18:00에 Health Samurai가 온라인 밋업 “core.async: Deep Dive”를 주최합니다. 이번 강연은 clojure.core의 내부를 파헤칩니다....