Apache Airflow 3 初学者指南
Source: Dev.to
什么是数据编排?
在 DataOps(数据运营) 中,编排是管理数据工作流(例如 ETL 流水线)的底层系统,以确保任务在正确的时间以正确的顺序运行。
什么是 DAG?
A DAG (Directed Acyclic Graph) 是一种包含所有待运行任务的模型。
| 术语 | 含义 |
|---|---|
| Directed | 任务具有特定的方向(例如,提取 → 转换)。 |
| Acyclic | 没有循环依赖——如果转换依赖于提取,则提取不能依赖于转换。 |
| Graph | 由任务(节点)通过依赖关系(边)连接而成的集合。 |
什么是任务?
任务 是 DAG 中的单个工作单元。
类比: 将 DAG 想象成管弦乐指挥,任务则是乐器。
核心 Airflow 组件
| Component | Description |
|---|---|
| Scheduler | 将任务提交给执行器并触发计划的工作流。 |
| DAG Processor | 读取 DAG 文件并将其组织到元数据数据库中。 |
| Webserver | 用于检查、触发和调试 DAG 与任务的 Airflow UI。 |
| DAG Folder | 调度器读取的专用 DAG 文件夹,用于决定何时运行哪些任务。 |
| Metadata Database | 存储任务、DAG 和变量的状态。 |
为什么不直接使用 Cron 任务?
Cron 任务就像闹钟——它们在设定的时间运行脚本,而不考虑依赖关系。
示例:
extract.py计划在 12:00 AM 运行transform.py计划在 1:30 AM 运行
如果提取过程需要 40 分钟,cron 仍会在 1:30 AM 触发转换任务,可能导致数据损坏或崩溃。
而 Airflow 则像项目经理:它尊重任务依赖,仅在上游任务成功后才运行下游任务。
示例:定义一个简单的 DAG(Python API)
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime, timedelta
# Step 1: Define your Python function
def my_function():
# Your logic here
pass
# Step 2: Set default arguments
default_args = {
'owner': 'your_name',
'depends_on_past': False, # don't wait for previous DAG runs
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'retries': 1, # retry once if it fails
'retry_delay': timedelta(minutes=5)
}
# Step 3: Create DAG object
with DAG(
dag_id='template_dag', # unique DAG identifier
default_args=default_args,
description='Template for new DAGs',
schedule_interval='@daily', # frequency of execution
catchup=False, # don't run for past dates
max_active_runs=1 # run one instance at a time
) as dag:
# Step 4: Define tasks
task1 = PythonOperator(
task_id='python_task',
python_callable=my_function
)
task2 = BashOperator(
task_id='bash_task',
bash_command='echo "Hello World"'
)
# Step 5: Set dependencies
task1 >> task2
这些指令由编排引擎解释,并使用可用资源顺序执行。这正是数据工程师所说的 Workflow‑as‑Code。
示例:TaskFlow API(基于装饰器)
import json
from airflow.decorators import dag, task
from pendulum import datetime
# 1. Define the DAG using the @dag decorator
@dag(
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
tags=["example", "taskflow"],
)
def taskflow_etl_pipeline():
# 2. Extract: returns a dictionary
@task()
def extract():
data_string = '{"1001": 30.5, "1002": 28.2, "1003": 31.1}'
return json.loads(data_string)
# 3. Transform: receives data from the upstream task
@task()
def transform(raw_data: dict):
total_value = sum(raw_data.values())
return {"total": total_value, "count": len(raw_data)}
# 4. Load: final task to "load" or print the data
@task()
def load(processed_data: dict):
print(f"Loading data: Total value is {processed_data['total']}")
# 5. Define dependencies by calling the functions
raw_data = extract()
summary = transform(raw_data)
load(summary)
# Instantiate the DAG
taskflow_etl_pipeline()
如何验证您的 DAG 正在运行
-
CLI:
airflow dags list # Shows parsed DAGs airflow dags list-import-errors # Shows syntax errors, if any -
Web UI: 访问
http://localhost:8080检查 DAG 状态、日志,并手动触发运行。
可扩展工作流的最佳实践
-
幂等性 – 任务在相同执行日期多次运行时应产生相同的结果。
-
原子性 – 每个任务应只执行一个明确的操作。如果转换阶段失败,只需重试该特定任务,而无需重新获取所有原始数据。

左侧 – 单体;右侧 – 模块化。 -
封装 – 只在顶层定义 DAG 结构。繁重的数据处理、API 调用或数据库查询不应放在文件的全局作用域中,因为调度器在每次解析文件时都会执行这些代码,可能导致 Airflow 实例崩溃。
要点
Apache Airflow 起初可能看起来令人生畏,但其核心不过是一个旨在将混乱变为有序的工具。通过拥抱编排,你可以将孤立的、手动运行的脚本转化为可靠的、自动化的数据管道。
祝编排愉快!
数据编排
- 数据编排对数据管道至关重要;它确保您的数据任务按正确的顺序和时间运行。
DAGs
- DAG 是蓝图;它们提供任务及其依赖关系的映射,确保没有任务出现顺序错误。
Airflow
- Airflow 通过处理执行和监控任务的后勤工作来完成繁重的工作,让您可以专注于业务逻辑。
工作流即代码
- 无论您使用传统运算符还是现代的、符合 Python 风格的 TaskFlow API,您都可以灵活地定义复杂的流水线。