Apache Airflow 3.2.0 安装与配置指南(使用 PostgreSQL)并运行你的第一个 DAG

发布: (2026年5月3日 GMT+8 02:46)
13 分钟阅读
原文: Dev.to

Source: Dev.to

安装和配置 Apache Airflow 3.2.0(使用 PostgreSQL)并运行你的第一个 DAG

本文档将一步步演示如何在本地机器上使用 PostgreSQL 作为元数据库来安装、配置 Apache Airflow 3.2.0,并创建、运行一个简单的 DAG。整个过程在 Linux/macOS 环境下完成,Windows 用户可以使用 WSL。


目录

  1. 前置条件
  2. 安装 PostgreSQL
  3. 创建 Airflow 数据库和用户
  4. 安装 Airflow(使用 pip
  5. 配置 airflow.cfg
  6. 初始化元数据库
  7. 创建管理员账户
  8. 启动 Web Server 与 Scheduler
  9. 编写并运行第一个 DAG
  10. 常见问题排查

前置条件

项目版本/要求
Python3.93.11(推荐 3.10)
pip最新版(python -m pip install --upgrade pip
virtualenv推荐使用(python -m pip install virtualenv
PostgreSQL13 或更高(本示例使用 14)
Git任意(可选,用于克隆示例 DAG)

提示:如果你已经在系统中安装了 PostgreSQL,请直接跳到「创建 Airflow 数据库和用户」章节。


安装 PostgreSQL

macOS(使用 Homebrew)

brew update
brew install postgresql@14
brew services start postgresql@14

Ubuntu/Debian

sudo apt update
sudo apt install postgresql postgresql-contrib
sudo systemctl enable --now postgresql

验证安装

psql --version
# 输出类似:psql (PostgreSQL) 14.9

创建 Airflow 数据库和用户

切换到 postgres 系统用户并进入 psql

sudo -i -u postgres
psql

psql 提示符下执行:

CREATE DATABASE airflow;
CREATE USER airflow_user WITH ENCRYPTED PASSWORD 'your_strong_password';
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow_user;
\q

注意:请把 your_strong_password 替换为实际的强密码,并在后续 airflow.cfg 中保持一致。


安装 Airflow(使用 pip

  1. 创建并激活虚拟环境
mkdir ~/airflow && cd ~/airflow
python -m venv venv
source venv/bin/activate
  1. 安装 Airflow 及 PostgreSQL 依赖
export AIRFLOW_VERSION=3.2.0
export PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" \
    --constraint "${CONSTRAINT_URL}" \
    psycopg2-binary

解释constraints 文件确保所有依赖版本与 Airflow 兼容。


配置 airflow.cfg

初始化后会在 ~/airflow/airflow.cfg 中生成默认配置。我们只需要修改以下几项:

[core]
# 使用 PostgreSQL 作为元数据库
sql_alchemy_conn = postgresql+psycopg2://airflow_user:your_strong_password@localhost/airflow

# 关闭本地 SQLite(默认)
executor = LocalExecutor

# DAG 文件默认存放路径
dags_folder = /home/your_username/airflow/dags

提示:将 your_strong_passwordyour_username 替换为实际值。


初始化元数据库

airflow db init

此命令会创建所需的表结构并写入默认记录。


创建管理员账户

airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

系统会提示输入密码,请记住此密码用于登录 Web UI。


启动 Web Server 与 Scheduler

在同一个虚拟环境中打开两个终端(或使用 tmux/screen):

# 终端 1:Web Server(默认端口 8080)
airflow webserver --port 8080
# 终端 2:Scheduler
airflow scheduler

打开浏览器访问 http://localhost:8080,使用上一步创建的 admin 账户登录。


编写并运行第一个 DAG

~/airflow/dags 目录下创建文件 example_dag.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="example_hello_world",
    default_args=default_args,
    description="A simple hello world DAG",
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        bash_command="sleep 5",
    )

    t3 = BashOperator(
        task_id="echo_hello",
        bash_command='echo "Hello Airflow!"',
    )

    t1 >> t2 >> t3

保存后,刷新 Airflow UI,你会看到 example_hello_world DAG 出现在列表中。点击 Trigger DAG 手动运行,随后可以在 Graph ViewTree View 中观察任务执行情况。


常见问题排查

症状可能原因解决办法
Web UI 报错 ConnectionRefusedErrorPostgreSQL 未启动或连接信息错误确认 postgresql 服务已运行,检查 airflow.cfg 中的 sql_alchemy_conn
Scheduler 卡住不调度executor 配置不匹配对于本地开发使用 LocalExecutor,如需分布式请改为 CeleryExecutor 并配置 broker
DAG 未显示dags_folder 路径错误或文件权限不足确认 airflow.cfg 中的 dags_folder 与实际路径一致,且文件可读
任务失败,提示 psycopg2.OperationalError数据库密码错误或用户权限不足重新检查 PostgreSQL 用户权限,或在 airflow.cfg 中更新密码后重新启动服务

小结

  • 使用 PostgreSQL 替代默认的 SQLite,可获得更好的并发和持久化能力。
  • 通过 virtualenv 隔离依赖,确保不同项目之间互不干扰。
  • 完成上述步骤后,你已经拥有一个可在本地运行的完整 Airflow 环境,并成功执行了第一个 DAG。
  • 接下来可以探索 OperatorsSensorsHooks,以及将 CeleryExecutorKubernetesExecutor 结合使用,实现真正的生产级调度。

祝你玩得开心,Happy Airflow! 🚀

介绍

作为数据工程师,您可能最近了解了 Apache Airflow,它是什么,以及它如何编排和自动化数据工作流。
下一步是通过在您自己的环境中部署它来获得实践经验。

本文提供了逐步指南,内容包括:

  • 安装和配置 Apache Airflow
  • 将其连接到 PostgreSQL
  • 运行您的第一个 DAG

完成后,您将拥有一个功能完整的 Airflow 环境,准备好构建和管理数据管道。

我们将遵循官方安装指南。

前置条件

  • Linux 环境(例如,Linux VPS)
  • 已安装 Python 3
sudo apt install python-is-python3   # makes `python` point to Python 3

1. 设置 Airflow 主目录

Airflow 默认使用 ~/airflow,但你可以选择其他位置。
在安装 Airflow 之前 设置环境变量:

export AIRFLOW_HOME=~/airflow

2. 创建项目文件夹并设置虚拟环境

cd ~                     # go to your home directory
mkdir airflow && cd airflow
python -m venv airflow_venv
source airflow_venv/bin/activate

升级 pip

pip install --upgrade pip

3. 安装 Apache Airflow

指定您想要的 Airflow 版本(示例:3.2.0)以及相匹配的 Python 版本约束:

pip install apache-airflow[celery]==3.2.0 \
    --constraint https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.12.txt

等待几秒钟让安装完成,然后验证:

airflow version

4. 启动 Airflow

4.1 使用 Airflow Standalone(快速入门)

airflow standalone

此命令会启动所有组件,但日志会直接打印到终端,导致后续操作被阻塞。

在后台运行并重定向日志:

nohup airflow standalone > airflow.log 2>&1 &

检查进程:

ps aux | grep airflow

在浏览器打开 Web UI,地址为 http://<your-ip>:8080(例如 http://102.209.32.65:8080)。

4.2 手动启动各组件

如果你想自行启动每个服务:

airflow db migrate

airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

airflow api-server --port 8080
airflow scheduler
airflow dag-processor
airflow triggerer

注意: 在 Airflow 3+ 中,上述命令需要 Flask‑AppBuilder(FAB)认证管理器。

启用 FAB 认证管理器

编辑 airflow.cfg(默认位置:$AIRFLOW_HOME/airflow.cfg):

nano $AIRFLOW_HOME/airflow.cfg
# 添加/确保以下内容:
auth_manager = airflow.providers.fab.auth_manager.fab_auth_manager.FABAuthManager

如果出现

ModuleNotFoundError: No module named 'airflow.providers.fab'

请安装缺失的 provider:

pip install apache-airflow-providers-fab

再次执行迁移:

airflow db migrate

创建管理员用户(如果尚未创建),并在后台启动各服务:

nohup airflow api-server --port 8080 > api-server.log 2>&1 &
nohup airflow scheduler               > scheduler.log 2>&1 &
nohup airflow dag-processor           > dag-processor.log 2>&1 &
nohup airflow triggerer               > triggerer.log 2>&1 &

现在可以通过浏览器 UI 访问 Airflow。

5. 调整 Airflow 配置

在编辑之前,先停止所有正在运行的 Airflow 进程:

pkill -9 airflow

打开配置文件:

nano $AIRFLOW_HOME/airflow.cfg

常见更改(可选)

设置期望值备注
dags_folder/root/workflows存放 DAG 文件的位置
default_timezoneyour/local/timezone(例如 Europe/Paris将时间戳对齐到您所在的时区
executorLocalExecutor本地运行且需要并行任务时使用
sql_alchemy_connpostgresql+psycopg2://user:password@localhost:5432/airflowdb指向外部 PostgreSQL 实例
load_examplesFalse禁用 Airflow 自带的示例 DAG

保存(Ctrl+S)并退出(Ctrl+X)。

6. 安装数据库驱动

在已激活的虚拟环境中:

pip install psycopg2-binary   # PostgreSQL driver
pip install asyncpg           # Async PostgreSQL driver (optional but recommended)

运行迁移以在新数据库中创建 Airflow 表:

airflow db migrate

7. 添加你的第一个 DAG

创建你在 dags_folder 中指定的目录(例如 /root/workflows),并添加一个简单的 DAG:

mkdir -p /root/workflows
cd /root/workflows
nano simple.py

将以下 Python 代码粘贴进去并保存文件:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.standard.operators.python import PythonOperator

def say_hello():
    print("Hello from Airflow!")

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="hello_world",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False,
) as dag:

    hello_task = PythonOperator(
        task_id="say_hello",
        python_callable=say_hello,
    )

保存后,刷新 Airflow UI —— hello_world DAG 应该会出现并准备运行。

🎉 您现在拥有一个功能完整的 Apache Airflow 安装,已连接 PostgreSQL,并运行了您的第一个 DAG! 🎉

随时探索更复杂的 DAG,集成更多提供者,并根据需要扩展执行器。祝数据工程愉快!

简单的 Airflow DAG 示例

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator

def say_hello():
    print("Hello from Airflow!")

def say_goodbye():
    print("Goodbye from Airflow!")

with DAG(
    dag_id="simple_dag",
    start_date=datetime(2026, 1, 1),
    schedule_interval=timedelta(minutes=5),
    catchup=False,
) as dag:

    hello_task = PythonOperator(
        task_id="hi",
        python_callable=say_hello,
    )

    goodbye_task = PythonOperator(
        task_id="bye",
        python_callable=say_goodbye,
    )

    hello_task >> goodbye_task

注意: Airflow 会自动检测并加载 DAG。DAG 会列在 UI 的 DAGs 部分。

查看 DAG

  • 点击 DAG 名称以查看其详细信息,包括运行历史和成功/失败状态。

小结

在本文中,您已经:

  • 成功安装并配置了 Apache Airflow 3.2.0
  • 将 Airflow 连接到 PostgreSQL 后端。
  • 探索了启动 Airflow 的两种方式:
    1. 简化的 standalone(独立)方式。
    2. 使用手动创建的用户和独立的 Airflow 服务的 production‑style(生产式)部署。
  • airflow.cfg 文件中进行了必要的配置更改。
  • 将您的第一个 DAG 部署到 Airflow 环境中。

有了此基础,您现在拥有一个功能齐全的编排平台,能够调度、监控和管理数据工作流。随着您继续学习 Airflow,您可以深入更高级的主题,例如:

  • 复杂的任务依赖关系
  • 高级调度策略
  • 与云平台的集成
  • 构建生产级别的 ETL 和数据工程流水线

编排愉快!

0 浏览
Back to Blog

相关文章

阅读更多 »

DAG 工作流引擎

DAG 工作流引擎:一个面向生产的 DAG(有向无环图)工作流引擎,由 YAML DSL 驱动。验证、执行并可视化工作流,支持…

Apache Airflow 3 初学者指南

数据编排与 Apache Airflow – 初学者指南 如果“编排”或 “Apache Airflow” 这些术语听起来像令人望而生畏的行业行话,本文将……

Python 入门指南

今天我开始学习 Python,并且探索了一些基本概念,这些概念帮助我了解 Python 在幕后是如何实际工作的。Python 是什么?...