Trying Out Dagster for Data Orchestration

Published: (December 1, 2025 at 06:33 PM EST)
4 min read
Source: Dev.to

Source: Dev.to

Intro

I can’t jump right into the pipeline without a brief intro and highlighting the most obvious differentiating factor that Dagster has – Assets. In Dagster you model your pipeline around the actual data entities (assets) rather than flows, operators, or DAGs. Procedural steps such as extracting data and loading it into S3 are modeled as a single asset instead of separate tasks. This puts Dagster in a more declarative camp, with assets at the forefront, versus the task‑based, procedural approaches of Prefect and Airflow.

Dagster also supports the non‑asset way of doing things via an op, but using assets is the recommended practice unless the complexity requires an op. I’ll return to that later.

Data Pipeline Architecture

The weather_etl project has two datasets – actual weather and forecast – both sourced (including history) from the Open‑Meteo API and stored in S3 as Parquet files. The transformation step uses Polars, a fast alternative to pandas. The resulting Dagster asset graph looks like this:

Assets

Assets

Defining assets in Dagster is straightforward: just apply the @asset decorator to a function. The pipeline is organized into two layers:

  • raw – assets that fetch data from the Open‑Meteo API and store it unchanged in S3.
  • staged – assets that transform the raw data into normalized, analytics‑ready tables.

Raw Assets

historical_forecast

A partitioned asset that fetches historical weather forecast data month‑by‑month, starting from January 2023. Monthly partitioning enables independent processing of years of data, making the pipeline resilient to failures and easy to reprocess specific periods.

import dagster as dg
from datetime import datetime
from dagster import AssetExecutionContext, RetryPolicy, MetadataValue, AssetIn
from pendulum import UTC

monthly_partition = dg.MonthlyPartitionsDefinition(
    # Backfill the data from Jan 2023 onwards
    start_date=datetime(2023, 1, 1, tzinfo=UTC),
)

@dg.asset(
    key_prefix=["raw"],
    retry_policy=RetryPolicy(max_retries=3, delay=10),
    group_name="weather_etl",
    metadata={
        "owner": "ybryz",
        "api-url": MetadataValue.url(HISTORICAL_API_URL),
        "endpoint": "/forecast",
    },
    kinds={"s3"},
    partitions_def=monthly_partition,
)
def historical_forecast(
    context: AssetExecutionContext,
    weather_api_client: WeatherApiClient,
    s3: S3Resource,
):
    ...

hourly_forecast

Runs daily at 06:00 UTC and fetches the 7‑day forecast for configured cities (NYC, Philadelphia, Chicago, DC). It uses AutomationCondition.on_cron() to ensure fresh data each morning.

@dg.asset(
    key_prefix=["raw"],
    retry_policy=RetryPolicy(max_retries=3, delay=10),
    group_name="weather_etl",
    metadata={
        "owner": "ybryz",
        "api-url": MetadataValue.url(API_URL),
        "endpoint": "/forecast",
    },
    kinds={"s3"},
    automation_condition=dg.AutomationCondition.on_cron("0 6 * * *"),  # Daily at 6 AM UTC
)
def hourly_forecast(
    weather_api_client: WeatherApiClient,
    s3: S3Resource,
):
    ...

historical_actual_weather

Similar to historical_forecast but fetches observed weather data from the Archive API. It is also monthly partitioned and respects the Archive API’s 5‑day delay, meaning it can only retrieve data from 5 days ago or earlier.

actual_weather

Runs every 5 days, pulling recent observed data (10 days ago to 5 days ago) to keep the dataset current despite the API delay.

All raw assets share a common pattern: they call WeatherApiClient to fetch data, convert it to a Polars DataFrame, add an extracted_at timestamp for lineage, and upload the result to S3 as Parquet files organized by month. Each asset includes a retry policy (up to 3 retries with a 10‑second delay) to handle transient API failures.

Staged Assets

hourly_forecast_table

A transformation asset that depends on historical_forecast and hourly_forecast. It reads all raw forecast Parquet files from S3, normalizes the nested JSON by exploding hourly arrays, maps coordinates to city codes, and deduplicates records by city and timestamp. The asset uses AutomationCondition.eager() to run automatically whenever new upstream data arrives.

@dg.asset(
    key_prefix=["staged"],
    group_name="weather_etl",
    kinds={"polars"},
    deps=[historical_forecast, hourly_forecast],
    automation_condition=dg.AutomationCondition.eager(),
)
def hourly_forecast_table():
    ...

actual_weather_table

The counterpart for observed weather data. It depends on historical_actual_weather and actual_weather and applies the same normalization logic via a shared transform_weather_data() function.

@dg.asset(
    key_prefix=["staged"],
    group_name="weather_etl",
    kinds={"polars"},
    deps=[historical_actual_weather, actual_weather],
    automation_condition=dg.AutomationCondition.eager(),
)
def actual_weather_table():
    ...

Both staged assets include rich metadata such as table schemas and row counts, making it easy to monitor data quality and spot anomalies through the Dagster UI.

Resources and Dependency Injection

Dagster’s resource system cleanly handles dependency injection, avoiding global state or scattered configuration. In this pipeline a WeatherApiClient resource wraps the Open‑Meteo API:

class WeatherApiClient:
    def __init__(self, base_url: str, api_key: str | None = None):
        self.base_url = base_url
        self.api_key = api_key

    def fetch(self, endpoint: str, params: dict) -> dict:
        # Implementation that performs HTTP GET and returns JSON
        ...

# Register the resource in the Dagster definitions
weather_api_client_resource = dg.ResourceDefinition(
    resource_fn=lambda _: WeatherApiClient(base_url="https://api.open-meteo.com")
)

Resources like weather_api_client_resource and S3Resource are injected into assets that need them, keeping the code modular and testable.


Full pipeline source:

Back to Blog

Related posts

Read more »