尝试使用 Dagster 进行数据编排
Source: Dev.to
Intro
我不能在没有简短介绍的情况下直接进入管道,并且要强调 Dagster 最明显的差异化因素——资产(Assets)。在 Dagster 中,你围绕实际的数据实体(资产)来建模管道,而不是围绕流、操作符或 DAG。诸如提取数据并将其加载到 S3 之类的过程步骤被建模为单个 asset,而不是分开的任务。这使得 Dagster 更倾向于声明式风格,以资产为核心,而不是像 Prefect 和 Airflow 那样的基于任务、过程化的方法。
Dagster 也支持通过 op 的非资产方式,但除非复杂性需要使用 op,否则推荐使用资产。稍后我会再回到这个话题。
Data Pipeline Architecture
weather_etl 项目有两个数据集——实际天气和预测——均来自 Open‑Meteo API(包括历史数据),并以 Parquet 文件形式存储在 S3 中。转换步骤使用 Polars,它是 pandas 的高速替代品。生成的 Dagster 资产图如下所示:
Assets
在 Dagster 中定义资产非常直接:只需在函数上使用 @asset 装饰器。管道分为两层组织:
- raw – 从 Open‑Meteo API 获取数据并原样存储到 S3 的资产。
- staged – 将原始数据转换为规范化、可用于分析的表的资产。
Raw Assets
historical_forecast
一个分区资产,按月获取历史天气预报数据,起始于 2023 年 1 月。月度分区使得多年数据可以独立处理,使管道对故障具有弹性,并且可以轻松重新处理特定时间段。
import dagster as dg
from datetime import datetime
from dagster import AssetExecutionContext, RetryPolicy, MetadataValue, AssetIn
from pendulum import UTC
monthly_partition = dg.MonthlyPartitionsDefinition(
# Backfill the data from Jan 2023 onwards
start_date=datetime(2023, 1, 1, tzinfo=UTC),
)
@dg.asset(
key_prefix=["raw"],
retry_policy=RetryPolicy(max_retries=3, delay=10),
group_name="weather_etl",
metadata={
"owner": "ybryz",
"api-url": MetadataValue.url(HISTORICAL_API_URL),
"endpoint": "/forecast",
},
kinds={"s3"},
partitions_def=monthly_partition,
)
def historical_forecast(
context: AssetExecutionContext,
weather_api_client: WeatherApiClient,
s3: S3Resource,
):
...
hourly_forecast
每天 06:00 UTC 运行,获取配置城市(NYC、Philadelphia、Chicago、DC)的 7 天预报。使用 AutomationCondition.on_cron() 确保每天早晨都有最新数据。
@dg.asset(
key_prefix=["raw"],
retry_policy=RetryPolicy(max_retries=3, delay=10),
group_name="weather_etl",
metadata={
"owner": "ybryz",
"api-url": MetadataValue.url(API_URL),
"endpoint": "/forecast",
},
kinds={"s3"},
automation_condition=dg.AutomationCondition.on_cron("0 6 * * *"), # Daily at 6 AM UTC
)
def hourly_forecast(
weather_api_client: WeatherApiClient,
s3: S3Resource,
):
...
historical_actual_weather
与 historical_forecast 类似,但从 Archive API 获取观测到的天气数据。它同样采用月度分区,并遵守 Archive API 的 5 天延迟,即只能检索 5 天前或更早的数据。
actual_weather
每 5 天运行一次,拉取最近的观测数据(10 天前至 5 天前),以在 API 延迟的情况下保持数据集的时效性。
所有原始资产遵循相同的模式:调用 WeatherApiClient 获取数据,将其转换为 Polars DataFrame,添加 extracted_at 时间戳以记录血缘,然后以按月组织的 Parquet 文件形式上传到 S3。每个资产都包含重试策略(最多 3 次重试,间隔 10 秒),以处理瞬时的 API 失败。
Staged Assets
hourly_forecast_table
一个转换资产,依赖 historical_forecast 和 hourly_forecast。它读取 S3 中所有原始预报 Parquet 文件,展开嵌套的 JSON(按小时数组),将坐标映射到城市代码,并按城市和时间戳去重。该资产使用 AutomationCondition.eager(),在上游有新数据时自动运行。
@dg.asset(
key_prefix=["staged"],
group_name="weather_etl",
kinds={"polars"},
deps=[historical_forecast, hourly_forecast],
automation_condition=dg.AutomationCondition.eager(),
)
def hourly_forecast_table():
...
actual_weather_table
观测天气数据的对应资产。它依赖 historical_actual_weather 和 actual_weather,并通过共享的 transform_weather_data() 函数应用相同的规范化逻辑。
@dg.asset(
key_prefix=["staged"],
group_name="weather_etl",
kinds={"polars"},
deps=[historical_actual_weather, actual_weather],
automation_condition=dg.AutomationCondition.eager(),
)
def actual_weather_table():
...
两个 staged 资产都包含丰富的元数据,如表结构和行数,便于通过 Dagster UI 监控数据质量并发现异常。
Resources and Dependency Injection
Dagster 的资源系统干净地处理依赖注入,避免全局状态或分散的配置。在本管道中,WeatherApiClient 资源封装了 Open‑Meteo API:
class WeatherApiClient:
def __init__(self, base_url: str, api_key: str | None = None):
self.base_url = base_url
self.api_key = api_key
def fetch(self, endpoint: str, params: dict) -> dict:
# Implementation that performs HTTP GET and returns JSON
...
# Register the resource in the Dagster definitions
weather_api_client_resource = dg.ResourceDefinition(
resource_fn=lambda _: WeatherApiClient(base_url="https://api.open-meteo.com")
)
像 weather_api_client_resource 和 S3Resource 这样的资源会被注入到需要它们的资产中,使代码保持模块化且易于测试。
Full pipeline source:
