Apache Airflow 3.2.0을 PostgreSQL과 함께 설치 및 구성하는 가이드와 첫 번째 DAG 실행
Source: Dev.to
소개
데이터 엔지니어로서 최근에 Apache Airflow에 대해 배우고, 그것이 무엇이며 데이터 워크플로를 어떻게 오케스트레이션하고 자동화하는지 알게 되었을 수도 있습니다.
다음 단계는 직접 환경에 설치하여 실습 경험을 쌓는 것입니다.
이 문서는 단계별 가이드를 제공합니다:
- Apache Airflow 설치 및 구성
- PostgreSQL에 연결
- 첫 번째 DAG 실행
끝까지 읽으면 데이터 파이프라인을 구축하고 관리할 수 있는 완전한 Airflow 환경을 갖추게 됩니다.
공식 설치 가이드를 따르겠습니다.
사전 요구 사항
- Linux 환경 (예: Linux VPS)
- Python 3 설치
sudo apt install python-is-python3 # makes `python` point to Python 3
1. Airflow 홈 디렉터리 설정
Airflow는 기본적으로 ~/airflow를 사용하지만, 다른 위치를 선택할 수 있습니다.
Airflow를 설치하기 전에 환경 변수를 설정하세요:
export AIRFLOW_HOME=~/airflow
2. 프로젝트 폴더 및 가상 환경 만들기
cd ~ # go to your home directory
mkdir airflow && cd airflow
python -m venv airflow_venv
source airflow_venv/bin/activate
pip 업그레이드:
pip install --upgrade pip
3. Apache Airflow 설치
원하는 Airflow 버전을 지정합니다(예: 3.2.0) 및 해당 Python 버전 제약 조건을 맞춥니다:
pip install apache-airflow[celery]==3.2.0 \
--constraint https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.12.txt
설치가 완료될 때까지 몇 초 정도 기다린 뒤, 다음 명령으로 확인합니다:
airflow version
4. Airflow 시작
4.1 Airflow Standalone 사용 (빠른 시작)
airflow standalone
이 명령은 모든 구성 요소를 시작하지만 로그가 터미널에 출력되어 이후 작업이 차단됩니다.
백그라운드에서 실행하고 로그를 리다이렉트합니다:
nohup airflow standalone > airflow.log 2>&1 &
프로세스를 확인합니다:
ps aux | grep airflow
http://<your-ip>:8080 (예: http://102.209.32.65:8080) 에서 웹 UI에 접속합니다.
4.2 구성 요소를 수동으로 실행
각 서비스를 직접 시작하고 싶다면:
airflow db migrate
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@superhero.org
airflow api-server --port 8080
airflow scheduler
airflow dag-processor
airflow triggerer
Note: Airflow 3+에서는 위 명령어들이 Flask‑AppBuilder (FAB) 인증 관리자를 필요로 합니다.
FAB 인증 관리자 활성화
airflow.cfg (기본 위치: $AIRFLOW_HOME/airflow.cfg) 를 편집합니다:
nano $AIRFLOW_HOME/airflow.cfg
# 추가/확인:
auth_manager = airflow.providers.fab.auth_manager.fab_auth_manager.FABAuthManager
다음과 같은 오류가 발생하면
ModuleNotFoundError: No module named 'airflow.providers.fab'
누락된 프로바이더를 설치합니다:
pip install apache-airflow-providers-fab
마이그레이션을 다시 실행합니다:
airflow db migrate
관리자 사용자를 생성하고(아직 생성하지 않았다면) 서비스를 백그라운드에서 시작합니다:
nohup airflow api-server --port 8080 > api-server.log 2>&1 &
nohup airflow scheduler > scheduler.log 2>&1 &
nohup airflow dag-processor > dag-processor.log 2>&1 &
nohup airflow triggerer > triggerer.log 2>&1 &
이제 브라우저 UI를 통해 Airflow에 접속할 수 있습니다.
5. Airflow 구성 조정
편집하기 전에 실행 중인 Airflow 프로세스를 중지합니다:
pkill -9 airflow
구성 파일을 엽니다:
nano $AIRFLOW_HOME/airflow.cfg
일반적인 변경 사항 (선택 사항)
| 설정 | 원하는 값 | 비고 |
|---|---|---|
dags_folder | /root/workflows | DAG 파일을 저장할 위치 |
default_timezone | your/local/timezone (예: Europe/Paris) | 타임스탬프를 지역에 맞게 맞춤 |
executor | LocalExecutor | 로컬에서 실행하고 병렬 작업이 필요할 때 사용 |
sql_alchemy_conn | postgresql+psycopg2://user:password@localhost:5432/airflowdb | 외부 PostgreSQL 인스턴스를 지정 |
load_examples | False | Airflow와 함께 제공되는 예제 DAG를 비활성화 |
저장(Ctrl+S)하고 종료(Ctrl+X)합니다.
6. 데이터베이스 드라이버 설치
활성화된 가상 환경 안에서:
pip install psycopg2-binary # PostgreSQL driver
pip install asyncpg # Async PostgreSQL driver (optional but recommended)
새 데이터베이스에 Airflow 테이블을 생성하기 위해 마이그레이션을 실행합니다:
airflow db migrate
7. 첫 번째 DAG 추가
dags_folder 로 지정한 디렉터리(예: /root/workflows)를 만들고 간단한 DAG를 추가합니다:
mkdir -p /root/workflows
cd /root/workflows
nano simple.py
다음 Python 코드를 붙여넣고 파일을 저장합니다:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.standard.operators.python import PythonOperator
def say_hello():
print("Hello from Airflow!")
default_args = {
"owner": "airflow",
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="hello_world",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
default_args=default_args,
catchup=False,
) as dag:
hello_task = PythonOperator(
task_id="say_hello",
python_callable=say_hello,
)
저장한 뒤 Airflow UI를 새로 고치면 hello_world DAG가 나타나며 실행 준비가 완료됩니다.
🎉 이제 Apache Airflow 설치가 완전히 작동하고 PostgreSQL에 연결되었으며 첫 번째 DAG를 실행했습니다! 🎉
보다 복잡한 DAG를 탐색하고, 추가 제공자를 통합하며, 필요에 따라 실행자를 확장해 보세요. 즐거운 데이터 엔지니어링 되세요!
간단한 Airflow DAG 예제
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
def say_hello():
print("Hello from Airflow!")
def say_goodbye():
print("Goodbye from Airflow!")
with DAG(
dag_id="simple_dag",
start_date=datetime(2026, 1, 1),
schedule_interval=timedelta(minutes=5),
catchup=False,
) as dag:
hello_task = PythonOperator(
task_id="hi",
python_callable=say_hello,
)
goodbye_task = PythonOperator(
task_id="bye",
python_callable=say_goodbye,
)
hello_task >> goodbye_task
Note: Airflow는 DAG를 자동으로 감지하고 로드합니다. DAG는 UI의 DAGs 섹션에 나열됩니다.
DAG 보기
- DAG 이름을 클릭하면 실행 기록 및 성공/실패 상태를 포함한 상세 정보를 확인할 수 있습니다.
요약
이 문서에서 여러분은:
- Apache Airflow 3.2.0을 성공적으로 설치하고 구성했습니다.
- Airflow를 PostgreSQL 백엔드에 연결했습니다.
- Airflow를 실행하는 두 가지 방법을 살펴보았습니다:
- 간소화된 standalone 방식.
- 수동으로 사용자와 개별 Airflow 서비스를 생성하는 production‑style 설정.
airflow.cfg파일에서 필수 구성 변경을 수행했습니다.- 첫 번째 DAG를 Airflow 환경에 배포했습니다.
이 기반을 통해 이제 스케줄링, 모니터링 및 데이터 워크플로우 관리가 가능한 기능적인 오케스트레이션 플랫폼을 갖추게 되었습니다. Airflow 학습을 이어가면서 다음과 같은 고급 주제로 파고들 수 있습니다:
- 복잡한 작업 의존성
- 고급 스케줄링 전략
- 클라우드 플랫폼과의 통합
- 프로덕션 수준의 ETL 및 데이터 엔지니어링 파이프라인 구축
즐거운 오케스트레이션 되세요!