Apache Airflow 3.2.0 安装与配置指南(使用 PostgreSQL)并运行你的第一个 DAG
Source: Dev.to
安装和配置 Apache Airflow 3.2.0(使用 PostgreSQL)并运行你的第一个 DAG
本文档将一步步演示如何在本地机器上使用 PostgreSQL 作为元数据库来安装、配置 Apache Airflow 3.2.0,并创建、运行一个简单的 DAG。整个过程在 Linux/macOS 环境下完成,Windows 用户可以使用 WSL。
目录
- 前置条件
- 安装 PostgreSQL
- 创建 Airflow 数据库和用户
- 安装 Airflow(使用
pip) - 配置
airflow.cfg - 初始化元数据库
- 创建管理员账户
- 启动 Web Server 与 Scheduler
- 编写并运行第一个 DAG
- 常见问题排查
前置条件
| 项目 | 版本/要求 |
|---|---|
| Python | 3.9 – 3.11(推荐 3.10) |
| pip | 最新版(python -m pip install --upgrade pip) |
| virtualenv | 推荐使用(python -m pip install virtualenv) |
| PostgreSQL | 13 或更高(本示例使用 14) |
| Git | 任意(可选,用于克隆示例 DAG) |
提示:如果你已经在系统中安装了 PostgreSQL,请直接跳到「创建 Airflow 数据库和用户」章节。
安装 PostgreSQL
macOS(使用 Homebrew)
brew update
brew install postgresql@14
brew services start postgresql@14
Ubuntu/Debian
sudo apt update
sudo apt install postgresql postgresql-contrib
sudo systemctl enable --now postgresql
验证安装
psql --version
# 输出类似:psql (PostgreSQL) 14.9
创建 Airflow 数据库和用户
切换到 postgres 系统用户并进入 psql:
sudo -i -u postgres
psql
在 psql 提示符下执行:
CREATE DATABASE airflow;
CREATE USER airflow_user WITH ENCRYPTED PASSWORD 'your_strong_password';
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow_user;
\q
注意:请把
your_strong_password替换为实际的强密码,并在后续airflow.cfg中保持一致。
安装 Airflow(使用 pip)
- 创建并激活虚拟环境
mkdir ~/airflow && cd ~/airflow
python -m venv venv
source venv/bin/activate
- 安装 Airflow 及 PostgreSQL 依赖
export AIRFLOW_VERSION=3.2.0
export PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" \
--constraint "${CONSTRAINT_URL}" \
psycopg2-binary
解释:
constraints文件确保所有依赖版本与 Airflow 兼容。
配置 airflow.cfg
初始化后会在 ~/airflow/airflow.cfg 中生成默认配置。我们只需要修改以下几项:
[core]
# 使用 PostgreSQL 作为元数据库
sql_alchemy_conn = postgresql+psycopg2://airflow_user:your_strong_password@localhost/airflow
# 关闭本地 SQLite(默认)
executor = LocalExecutor
# DAG 文件默认存放路径
dags_folder = /home/your_username/airflow/dags
提示:将
your_strong_password、your_username替换为实际值。
初始化元数据库
airflow db init
此命令会创建所需的表结构并写入默认记录。
创建管理员账户
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
系统会提示输入密码,请记住此密码用于登录 Web UI。
启动 Web Server 与 Scheduler
在同一个虚拟环境中打开两个终端(或使用 tmux/screen):
# 终端 1:Web Server(默认端口 8080)
airflow webserver --port 8080
# 终端 2:Scheduler
airflow scheduler
打开浏览器访问 http://localhost:8080,使用上一步创建的 admin 账户登录。
编写并运行第一个 DAG
在 ~/airflow/dags 目录下创建文件 example_dag.py:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="example_hello_world",
default_args=default_args,
description="A simple hello world DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
bash_command="sleep 5",
)
t3 = BashOperator(
task_id="echo_hello",
bash_command='echo "Hello Airflow!"',
)
t1 >> t2 >> t3
保存后,刷新 Airflow UI,你会看到 example_hello_world DAG 出现在列表中。点击 Trigger DAG 手动运行,随后可以在 Graph View 或 Tree View 中观察任务执行情况。
常见问题排查
| 症状 | 可能原因 | 解决办法 |
|---|---|---|
Web UI 报错 ConnectionRefusedError | PostgreSQL 未启动或连接信息错误 | 确认 postgresql 服务已运行,检查 airflow.cfg 中的 sql_alchemy_conn |
| Scheduler 卡住不调度 | executor 配置不匹配 | 对于本地开发使用 LocalExecutor,如需分布式请改为 CeleryExecutor 并配置 broker |
| DAG 未显示 | dags_folder 路径错误或文件权限不足 | 确认 airflow.cfg 中的 dags_folder 与实际路径一致,且文件可读 |
任务失败,提示 psycopg2.OperationalError | 数据库密码错误或用户权限不足 | 重新检查 PostgreSQL 用户权限,或在 airflow.cfg 中更新密码后重新启动服务 |
小结
- 使用 PostgreSQL 替代默认的 SQLite,可获得更好的并发和持久化能力。
- 通过 virtualenv 隔离依赖,确保不同项目之间互不干扰。
- 完成上述步骤后,你已经拥有一个可在本地运行的完整 Airflow 环境,并成功执行了第一个 DAG。
- 接下来可以探索 Operators、Sensors、Hooks,以及将 CeleryExecutor 与 KubernetesExecutor 结合使用,实现真正的生产级调度。
祝你玩得开心,Happy Airflow! 🚀
介绍
作为数据工程师,您可能最近了解了 Apache Airflow,它是什么,以及它如何编排和自动化数据工作流。
下一步是通过在您自己的环境中部署它来获得实践经验。
本文提供了逐步指南,内容包括:
- 安装和配置 Apache Airflow
- 将其连接到 PostgreSQL
- 运行您的第一个 DAG
完成后,您将拥有一个功能完整的 Airflow 环境,准备好构建和管理数据管道。
我们将遵循官方安装指南。
前置条件
- Linux 环境(例如,Linux VPS)
- 已安装 Python 3
sudo apt install python-is-python3 # makes `python` point to Python 3
1. 设置 Airflow 主目录
Airflow 默认使用 ~/airflow,但你可以选择其他位置。
在安装 Airflow 之前 设置环境变量:
export AIRFLOW_HOME=~/airflow
2. 创建项目文件夹并设置虚拟环境
cd ~ # go to your home directory
mkdir airflow && cd airflow
python -m venv airflow_venv
source airflow_venv/bin/activate
升级 pip:
pip install --upgrade pip
3. 安装 Apache Airflow
指定您想要的 Airflow 版本(示例:3.2.0)以及相匹配的 Python 版本约束:
pip install apache-airflow[celery]==3.2.0 \
--constraint https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.12.txt
等待几秒钟让安装完成,然后验证:
airflow version
4. 启动 Airflow
4.1 使用 Airflow Standalone(快速入门)
airflow standalone
此命令会启动所有组件,但日志会直接打印到终端,导致后续操作被阻塞。
在后台运行并重定向日志:
nohup airflow standalone > airflow.log 2>&1 &
检查进程:
ps aux | grep airflow
在浏览器打开 Web UI,地址为 http://<your-ip>:8080(例如 http://102.209.32.65:8080)。
4.2 手动启动各组件
如果你想自行启动每个服务:
airflow db migrate
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@superhero.org
airflow api-server --port 8080
airflow scheduler
airflow dag-processor
airflow triggerer
注意: 在 Airflow 3+ 中,上述命令需要 Flask‑AppBuilder(FAB)认证管理器。
启用 FAB 认证管理器
编辑 airflow.cfg(默认位置:$AIRFLOW_HOME/airflow.cfg):
nano $AIRFLOW_HOME/airflow.cfg
# 添加/确保以下内容:
auth_manager = airflow.providers.fab.auth_manager.fab_auth_manager.FABAuthManager
如果出现
ModuleNotFoundError: No module named 'airflow.providers.fab'
请安装缺失的 provider:
pip install apache-airflow-providers-fab
再次执行迁移:
airflow db migrate
创建管理员用户(如果尚未创建),并在后台启动各服务:
nohup airflow api-server --port 8080 > api-server.log 2>&1 &
nohup airflow scheduler > scheduler.log 2>&1 &
nohup airflow dag-processor > dag-processor.log 2>&1 &
nohup airflow triggerer > triggerer.log 2>&1 &
现在可以通过浏览器 UI 访问 Airflow。
5. 调整 Airflow 配置
在编辑之前,先停止所有正在运行的 Airflow 进程:
pkill -9 airflow
打开配置文件:
nano $AIRFLOW_HOME/airflow.cfg
常见更改(可选)
| 设置 | 期望值 | 备注 |
|---|---|---|
dags_folder | /root/workflows | 存放 DAG 文件的位置 |
default_timezone | your/local/timezone(例如 Europe/Paris) | 将时间戳对齐到您所在的时区 |
executor | LocalExecutor | 本地运行且需要并行任务时使用 |
sql_alchemy_conn | postgresql+psycopg2://user:password@localhost:5432/airflowdb | 指向外部 PostgreSQL 实例 |
load_examples | False | 禁用 Airflow 自带的示例 DAG |
保存(Ctrl+S)并退出(Ctrl+X)。
6. 安装数据库驱动
在已激活的虚拟环境中:
pip install psycopg2-binary # PostgreSQL driver
pip install asyncpg # Async PostgreSQL driver (optional but recommended)
运行迁移以在新数据库中创建 Airflow 表:
airflow db migrate
7. 添加你的第一个 DAG
创建你在 dags_folder 中指定的目录(例如 /root/workflows),并添加一个简单的 DAG:
mkdir -p /root/workflows
cd /root/workflows
nano simple.py
将以下 Python 代码粘贴进去并保存文件:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.standard.operators.python import PythonOperator
def say_hello():
print("Hello from Airflow!")
default_args = {
"owner": "airflow",
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="hello_world",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
default_args=default_args,
catchup=False,
) as dag:
hello_task = PythonOperator(
task_id="say_hello",
python_callable=say_hello,
)
保存后,刷新 Airflow UI —— hello_world DAG 应该会出现并准备运行。
🎉 您现在拥有一个功能完整的 Apache Airflow 安装,已连接 PostgreSQL,并运行了您的第一个 DAG! 🎉
随时探索更复杂的 DAG,集成更多提供者,并根据需要扩展执行器。祝数据工程愉快!
简单的 Airflow DAG 示例
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
def say_hello():
print("Hello from Airflow!")
def say_goodbye():
print("Goodbye from Airflow!")
with DAG(
dag_id="simple_dag",
start_date=datetime(2026, 1, 1),
schedule_interval=timedelta(minutes=5),
catchup=False,
) as dag:
hello_task = PythonOperator(
task_id="hi",
python_callable=say_hello,
)
goodbye_task = PythonOperator(
task_id="bye",
python_callable=say_goodbye,
)
hello_task >> goodbye_task
注意: Airflow 会自动检测并加载 DAG。DAG 会列在 UI 的 DAGs 部分。
查看 DAG
- 点击 DAG 名称以查看其详细信息,包括运行历史和成功/失败状态。
小结
在本文中,您已经:
- 成功安装并配置了 Apache Airflow 3.2.0。
- 将 Airflow 连接到 PostgreSQL 后端。
- 探索了启动 Airflow 的两种方式:
- 简化的 standalone(独立)方式。
- 使用手动创建的用户和独立的 Airflow 服务的 production‑style(生产式)部署。
- 在
airflow.cfg文件中进行了必要的配置更改。 - 将您的第一个 DAG 部署到 Airflow 环境中。
有了此基础,您现在拥有一个功能齐全的编排平台,能够调度、监控和管理数据工作流。随着您继续学习 Airflow,您可以深入更高级的主题,例如:
- 复杂的任务依赖关系
- 高级调度策略
- 与云平台的集成
- 构建生产级别的 ETL 和数据工程流水线
编排愉快!