Apache Airflow 3 初学者指南

发布: (2026年5月2日 GMT+8 08:40)
6 分钟阅读
原文: Dev.to

Source: Dev.to

什么是数据编排?

DataOps(数据运营) 中,编排是管理数据工作流(例如 ETL 流水线)的底层系统,以确保任务在正确的时间以正确的顺序运行。

什么是 DAG?

A DAG (Directed Acyclic Graph) 是一种包含所有待运行任务的模型。

术语含义
Directed任务具有特定的方向(例如,提取 → 转换)。
Acyclic没有循环依赖——如果转换依赖于提取,则提取不能依赖于转换。
Graph由任务(节点)通过依赖关系(边)连接而成的集合。

什么是任务?

任务 是 DAG 中的单个工作单元。

类比: 将 DAG 想象成管弦乐指挥,任务则是乐器。

核心 Airflow 组件

ComponentDescription
Scheduler将任务提交给执行器并触发计划的工作流。
DAG Processor读取 DAG 文件并将其组织到元数据数据库中。
Webserver用于检查、触发和调试 DAG 与任务的 Airflow UI。
DAG Folder调度器读取的专用 DAG 文件夹,用于决定何时运行哪些任务。
Metadata Database存储任务、DAG 和变量的状态。

为什么不直接使用 Cron 任务?

Cron 任务就像闹钟——它们在设定的时间运行脚本,而不考虑依赖关系。

示例:

  • extract.py 计划在 12:00 AM 运行
  • transform.py 计划在 1:30 AM 运行

如果提取过程需要 40 分钟,cron 仍会在 1:30 AM 触发转换任务,可能导致数据损坏或崩溃。

而 Airflow 则像项目经理:它尊重任务依赖,仅在上游任务成功后才运行下游任务。

示例:定义一个简单的 DAG(Python API)

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime, timedelta

# Step 1: Define your Python function
def my_function():
    # Your logic here
    pass

# Step 2: Set default arguments
default_args = {
    'owner': 'your_name',
    'depends_on_past': False,          # don't wait for previous DAG runs
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'retries': 1,                       # retry once if it fails
    'retry_delay': timedelta(minutes=5)
}

# Step 3: Create DAG object
with DAG(
    dag_id='template_dag',                 # unique DAG identifier
    default_args=default_args,
    description='Template for new DAGs',
    schedule_interval='@daily',            # frequency of execution
    catchup=False,                         # don't run for past dates
    max_active_runs=1                      # run one instance at a time
) as dag:

    # Step 4: Define tasks
    task1 = PythonOperator(
        task_id='python_task',
        python_callable=my_function
    )

    task2 = BashOperator(
        task_id='bash_task',
        bash_command='echo "Hello World"'
    )

    # Step 5: Set dependencies
    task1 >> task2

这些指令由编排引擎解释,并使用可用资源顺序执行。这正是数据工程师所说的 Workflow‑as‑Code

示例:TaskFlow API(基于装饰器)

import json
from airflow.decorators import dag, task
from pendulum import datetime

# 1. Define the DAG using the @dag decorator
@dag(
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["example", "taskflow"],
)
def taskflow_etl_pipeline():

    # 2. Extract: returns a dictionary
    @task()
    def extract():
        data_string = '{"1001": 30.5, "1002": 28.2, "1003": 31.1}'
        return json.loads(data_string)

    # 3. Transform: receives data from the upstream task
    @task()
    def transform(raw_data: dict):
        total_value = sum(raw_data.values())
        return {"total": total_value, "count": len(raw_data)}

    # 4. Load: final task to "load" or print the data
    @task()
    def load(processed_data: dict):
        print(f"Loading data: Total value is {processed_data['total']}")

    # 5. Define dependencies by calling the functions
    raw_data = extract()
    summary = transform(raw_data)
    load(summary)

# Instantiate the DAG
taskflow_etl_pipeline()

如何验证您的 DAG 正在运行

  • CLI:

    airflow dags list               # Shows parsed DAGs
    airflow dags list-import-errors # Shows syntax errors, if any
  • Web UI: 访问 http://localhost:8080 检查 DAG 状态、日志,并手动触发运行。

可扩展工作流的最佳实践

  1. 幂等性 – 任务在相同执行日期多次运行时应产生相同的结果。

  2. 原子性 – 每个任务应只执行一个明确的操作。如果转换阶段失败,只需重试该特定任务,而无需重新获取所有原始数据。

    Monolith vs. Modular
    左侧 – 单体;右侧 – 模块化。

  3. 封装 – 只在顶层定义 DAG 结构。繁重的数据处理、API 调用或数据库查询不应放在文件的全局作用域中,因为调度器在每次解析文件时都会执行这些代码,可能导致 Airflow 实例崩溃。

要点

Apache Airflow 起初可能看起来令人生畏,但其核心不过是一个旨在将混乱变为有序的工具。通过拥抱编排,你可以将孤立的、手动运行的脚本转化为可靠的、自动化的数据管道。

祝编排愉快!

数据编排

  • 数据编排对数据管道至关重要;它确保您的数据任务按正确的顺序和时间运行。

DAGs

  • DAG 是蓝图;它们提供任务及其依赖关系的映射,确保没有任务出现顺序错误。

Airflow

  • Airflow 通过处理执行和监控任务的后勤工作来完成繁重的工作,让您可以专注于业务逻辑。

工作流即代码

  • 无论您使用传统运算符还是现代的、符合 Python 风格的 TaskFlow API,您都可以灵活地定义复杂的流水线。
0 浏览
Back to Blog

相关文章

阅读更多 »

DAG 工作流引擎

DAG 工作流引擎:一个面向生产的 DAG(有向无环图)工作流引擎,由 YAML DSL 驱动。验证、执行并可视化工作流,支持…