尝试使用 Dagster 进行数据编排

发布: (2025年12月2日 GMT+8 07:33)
6 min read
原文: Dev.to

Source: Dev.to

Intro

我不能在没有简短介绍的情况下直接进入管道,并且要强调 Dagster 最明显的差异化因素——资产(Assets)。在 Dagster 中,你围绕实际的数据实体(资产)来建模管道,而不是围绕流、操作符或 DAG。诸如提取数据并将其加载到 S3 之类的过程步骤被建模为单个 asset,而不是分开的任务。这使得 Dagster 更倾向于声明式风格,以资产为核心,而不是像 Prefect 和 Airflow 那样的基于任务、过程化的方法。

Dagster 也支持通过 op 的非资产方式,但除非复杂性需要使用 op,否则推荐使用资产。稍后我会再回到这个话题。

Data Pipeline Architecture

weather_etl 项目有两个数据集——实际天气预测——均来自 Open‑Meteo API(包括历史数据),并以 Parquet 文件形式存储在 S3 中。转换步骤使用 Polars,它是 pandas 的高速替代品。生成的 Dagster 资产图如下所示:

Assets

Assets

在 Dagster 中定义资产非常直接:只需在函数上使用 @asset 装饰器。管道分为两层组织:

  • raw – 从 Open‑Meteo API 获取数据并原样存储到 S3 的资产。
  • staged – 将原始数据转换为规范化、可用于分析的表的资产。

Raw Assets

historical_forecast

一个分区资产,按月获取历史天气预报数据,起始于 2023 年 1 月。月度分区使得多年数据可以独立处理,使管道对故障具有弹性,并且可以轻松重新处理特定时间段。

import dagster as dg
from datetime import datetime
from dagster import AssetExecutionContext, RetryPolicy, MetadataValue, AssetIn
from pendulum import UTC

monthly_partition = dg.MonthlyPartitionsDefinition(
    # Backfill the data from Jan 2023 onwards
    start_date=datetime(2023, 1, 1, tzinfo=UTC),
)

@dg.asset(
    key_prefix=["raw"],
    retry_policy=RetryPolicy(max_retries=3, delay=10),
    group_name="weather_etl",
    metadata={
        "owner": "ybryz",
        "api-url": MetadataValue.url(HISTORICAL_API_URL),
        "endpoint": "/forecast",
    },
    kinds={"s3"},
    partitions_def=monthly_partition,
)
def historical_forecast(
    context: AssetExecutionContext,
    weather_api_client: WeatherApiClient,
    s3: S3Resource,
):
    ...

hourly_forecast

每天 06:00 UTC 运行,获取配置城市(NYC、Philadelphia、Chicago、DC)的 7 天预报。使用 AutomationCondition.on_cron() 确保每天早晨都有最新数据。

@dg.asset(
    key_prefix=["raw"],
    retry_policy=RetryPolicy(max_retries=3, delay=10),
    group_name="weather_etl",
    metadata={
        "owner": "ybryz",
        "api-url": MetadataValue.url(API_URL),
        "endpoint": "/forecast",
    },
    kinds={"s3"},
    automation_condition=dg.AutomationCondition.on_cron("0 6 * * *"),  # Daily at 6 AM UTC
)
def hourly_forecast(
    weather_api_client: WeatherApiClient,
    s3: S3Resource,
):
    ...

historical_actual_weather

historical_forecast 类似,但从 Archive API 获取观测到的天气数据。它同样采用月度分区,并遵守 Archive API 的 5 天延迟,即只能检索 5 天前或更早的数据。

actual_weather

每 5 天运行一次,拉取最近的观测数据(10 天前至 5 天前),以在 API 延迟的情况下保持数据集的时效性。

所有原始资产遵循相同的模式:调用 WeatherApiClient 获取数据,将其转换为 Polars DataFrame,添加 extracted_at 时间戳以记录血缘,然后以按月组织的 Parquet 文件形式上传到 S3。每个资产都包含重试策略(最多 3 次重试,间隔 10 秒),以处理瞬时的 API 失败。

Staged Assets

hourly_forecast_table

一个转换资产,依赖 historical_forecasthourly_forecast。它读取 S3 中所有原始预报 Parquet 文件,展开嵌套的 JSON(按小时数组),将坐标映射到城市代码,并按城市和时间戳去重。该资产使用 AutomationCondition.eager(),在上游有新数据时自动运行。

@dg.asset(
    key_prefix=["staged"],
    group_name="weather_etl",
    kinds={"polars"},
    deps=[historical_forecast, hourly_forecast],
    automation_condition=dg.AutomationCondition.eager(),
)
def hourly_forecast_table():
    ...

actual_weather_table

观测天气数据的对应资产。它依赖 historical_actual_weatheractual_weather,并通过共享的 transform_weather_data() 函数应用相同的规范化逻辑。

@dg.asset(
    key_prefix=["staged"],
    group_name="weather_etl",
    kinds={"polars"},
    deps=[historical_actual_weather, actual_weather],
    automation_condition=dg.AutomationCondition.eager(),
)
def actual_weather_table():
    ...

两个 staged 资产都包含丰富的元数据,如表结构和行数,便于通过 Dagster UI 监控数据质量并发现异常。

Resources and Dependency Injection

Dagster 的资源系统干净地处理依赖注入,避免全局状态或分散的配置。在本管道中,WeatherApiClient 资源封装了 Open‑Meteo API:

class WeatherApiClient:
    def __init__(self, base_url: str, api_key: str | None = None):
        self.base_url = base_url
        self.api_key = api_key

    def fetch(self, endpoint: str, params: dict) -> dict:
        # Implementation that performs HTTP GET and returns JSON
        ...

# Register the resource in the Dagster definitions
weather_api_client_resource = dg.ResourceDefinition(
    resource_fn=lambda _: WeatherApiClient(base_url="https://api.open-meteo.com")
)

weather_api_client_resourceS3Resource 这样的资源会被注入到需要它们的资产中,使代码保持模块化且易于测试。


Full pipeline source:

Back to Blog

相关文章

阅读更多 »