일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- github actions
- AWS
- XCOM
- kinesis
- testing
- pipeline
- Data Firehose
- DAG
- executor
- authoring
- Task
- TaskFlow
- sqs
- dagrun
- Concept
- Git
- ci/cd
- Scheduling
- celery
- lambda
- mysql
- airflow
- boto3
- git book
- RDS
- credentials
- Operator
- S3
- dynamic task mapping
- SecretsManager
- Today
- Total
CS
Airflow: DAG 형식 살펴보기 본문
DAG는 Airflow에서 워크플로를 가리키는 용어라고 할 수 있습니다. 반면 Task는 Airflow에서 실제로 수행할 작업을 가리키는 용어입니다.
Directed Acyclic Graph(유향 비순환 그래프)의 준말인데, 어떤 방향으로 향하고(유향) 다시 돌아갈 수는 없는(비순환) 그래프입니다. 말은 어렵지만, 생각해보면 많은 작업들이 이렇게 진행되죠.
DAG의 실체는 파이썬으로 작성되는 일종의 설정 파일이라고 할 수 있습니다. 여기에 작업 주기 등을 정하고 Task를 명시하여 이를 실행하게 만듭니다. DAG 자체가 어떠한 작업을 의미하는 것은 아닙니다.
DAG 예제 중 tutorial.py를 보며 DAG에 대해 알아보도록 하겠습니다.
(Airflow 페이지의 Fundamental Concepts 문서를 따릅니다.)
모듈 임포팅
from datetime import datetime, timedelta
from textwrap import dedent
# DAG 객체. DAG 인스턴스 생성용
from airflow import DAG
# 오퍼레이터. DAG 실행용
from airflow.operators.bash import BashOperator
BashOperator는 Airflow의 기본 오퍼레이터로, PythonOperator 같은 다른 오퍼레이터들은 모두 BashOperator를 상속합니다.
DAG 인스턴스 생성 및 기본 인수 설정
with DAG(
# dag_id. DAG 고유 식별자
"tutorial",
# 각 오퍼레이터에 입력할 인수
# 이후 Task별로 다시 인수를 정하여 오버라이드 가능
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
# DAG 실행 스케쥴
schedule=timedelta(days=1),
# DAG 시작일
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
'tutorial'라는 ID를 가지는 DAG를 생성합니다.
- default_args: 전체 Task에 기본 값으로 사용할 인수를 DAG 단위로 정의할 수 있어, 각 task마다 적는 일을 피할 수 있습니다.
- schedule: DAG 실행 주기입니다. 여기서는 하루에 한 번으로 하였습니다.
- start_date: DAG 시작일입니다. 주의할 점은 DAG가 해당일에 실행되는 게 아니라 그 다음 주기(하루 주기 DAG의 시작일이 2021년 1월 1일 0시라면, 첫 DAG 실행은 2021년 1월 2일 0시)에 실행되는 것입니다. DAG 종료일인 end date도 같은 메커니즘으로 작동합니다. DAG는 주로 데이터 파이프라인을 실행할 때 사용되는 만큼, 시작일부터 데이터를 수집한다고 생각하면 작업이 한 주기 만큼의 데이터가 모인 시점인 다음 주기부터 실행되는 것이 자연스럽습니다. 참고로 이는 logical date라는 용어와 비교되는데, logical date는 실제로 DAG가 실행되는 날을 가리킵니다. start date는 고정된 반면 logical date는 주기마다 바뀌겠죠.
그리고 start_date가 logical_date보다 후, 즉 미래인 경우, DAG가 성공하지만 실제로는 아무 것도 이루어지지 않는다는 것을 주의해야 합니다.
오퍼레이터와 Task 설정
오퍼레이터는 말 그대로 실행하는 것으로, 이를 사용해 실제로 작업을 수행합니다. 오퍼레이터는 작업의 한 단위가 됩니다.
DAG에서 오퍼레이터를 실행하려면 이를 Task로 정의해야 합니다.
참고로 Task를 오퍼레이터를 통해 정의하는 방식은 약간 구식이고, TaskFlow API라는 Task를 파이썬 함수 형태로 정의할 수 있는 방법이 쓰이는 경우도 많다고 합니다. 다만 오퍼레이터 방식을 쓰면 DAG 내 Task 종속성을 알기 쉬운 장점이 있다고 합니다. 일단은 오퍼레이터 방식을 사용합니다.
t1 = BashOperator(
# Task 고유 식별자
task_id="print_date",
# BashOperator로 실행할 명령어
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
참고로 여기서부터의 코드는 위의 DAG를 정의한 with 문 안에 작성된 것입니다.
'print_date'와 'sleep'이라는 ID를 갖는 Task를 정의합니다.
- bash_command: BashOperator이므로, 실행할 Bash 명령어를 정합니다.
- Task의 인수는 ①명시한 것, ②위 DAG의 default_args, ③기본 값 순으로 입력됩니다. 즉, 'print_date'의 retries는 명시하지 않았으므로 default_args에서 정한 값인 1이 되고, 'sleep'에서는 명시하였으므로 3이 됩니다.
Jinja 템플릿
Airflow에서 Jinja 템플릿을 사용할 수 있습니다. 또한 Airflow는 기본적으로 Jinja로 사용할 수 있는 변수, 필터, 매크로를 제공합니다.
# 명령문 생성
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
# 위에서 만든 명령문 사용
bash_command=templated_command,
)
template_command라는 명령문을 만들어서 오퍼레이터로 실행합니다.
- dedent(): 이건 파이썬 함수인데, 문자열에서 들여쓰기를 없앱니다. Bash에 전달되는 문자열이기 때문에 오류를 피하기 위해 사용한 것 같습니다.
- '{% %}': Jinja에서 statement를 열고 닫을 때 사용합니다. 여기서는 for 문을 쓰는 데 사용되었습니다.
- '{{ }}': Jinja에서 변수나 함수를 사용할 때 씁니다. 여기서 쓰인 ds나 macros.ds_add()는 Airflow에서 제공하는 변수와 매크로입니다. ds는 date stamp로 DAG의 logical date를 'YYYY-MM-DD'로 가져옵니다. ds_add()는 앞 인수에 뒷 인수만큼 날짜를 더하는 매크로입니다. 여기서는 현재 ds에 7일을 더하겠네요.
Airflow의 Jinja 템플릿은 문서에서 확인하세요.
Task 'templated'에서는 위에서 만든 template_command를 bash_command로 사용합니다. 또한 bash_command로 DAG 파일 외부의 스크립트 파일도 실행할 수 있습니다.
그리고 Jinja로 사용할 변수나 매크로도 직접 만들 수 있습니다.
Task 의존성 설정
위에서 정한 3개의 Task는 서로 의존적이지는 않지만, 실행 순서를 정할 수 있습니다.
아래 코드는 Task 간 의존성을 정하는 여러 예시입니다.
# 1. t2가 t1에 의존
t1.set_downstream(t2)
# 위와 동일
t2.set_upstream(t1)
# 2. bit shift 연산자를 사용할 수 있음
# 1과 동일
t1 >> t2
# 위와 동일
t2 << t1
# 3. 3개 이상의 Task를 연결할 때는 bit shift 연산자가 간결함
t1 >> t2 >> t3
# 4. 하나의 Task가 여러 개의 Task와 의존성을 가질 수 있음
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
DAG에 순환 구조가 있거나, 중복적으로 명시한 경우 예외가 발생합니다.
테스트 해보기
tutorial.py는 Airflow Fundamental Concepts 문서에서 찾을 수 있습니다.
이 파일을 dags_folder에 저장합니다. airflow.cfg에서 정할 수 있는데, 기본 값은 AIRFLOW_HOME의 dags 폴더입니다. dags 폴더가 없으면 만들면 됩니다.
스크립트 실행
python ~/airflow/dags/tutorial.py
우선 파이썬으로 파일을 실행하여 파일 자체에 오류가 없는지 확인합니다.
파일 내용상 아무 것도 출력되지 않는 게 정상입니다.
명령줄에서 메타 데이터 확인
# Airflow 데이터베이스 실행
airflow db init
# DAG 목록
airflow dags list
# tutorial의 Task 목록
airflow tasks list tutorial
# tutorial의 Task를 tree로 확인
airflow tasks list tutorial --tree
Airflow로 Task 및 DAG 테스트
# 명령어 형식: command subcommand [dag_id] [task_id] [(optional) date]
# print_date 테스트
airflow tasks test tutorial print_date 2015-06-01
# sleep 테스트
airflow tasks test tutorial sleep 2015-06-01
# templated 테스트
airflow tasks test tutorial templated 2015-06-01
# DAG 테스트
airflow dags test tutorial 2015-06-01
Airflow를 사용할 때 중요한 점 하나는 실행 주기입니다. 여기서는 테스트이기 때문에 특정 날짜를 입력하여 DAG를 실행합니다. 여기서 date 인수는 DAG의 logical date(예전 execution date)가 됩니다.
예시에서는 2015년 6월 1일을 입력하였는데, 지금보다 예전이라면 뭘 입력해도 상관 없습니다. 왜냐하면 Airflow 스케쥴러는 특정 시점이 아니라 주기에 기반하기 때문입니다. 명령어를 입력하면 해당 Task가 실행됩니다. 컴퓨터 시간보다 미래를 입력한다면 아직 logical date에 도달하지 않았기 때문에 실행되지 않습니다.
test 명령어를 사용하면 로그를 stdout으로 출력하고 종속성이나 데이터베이스를 건드리지 않기 때문에 편리합니다.
Backfill
# 2015년 6월 1일 ~ 2015년 6월 7일 backfill
airflow dags backfill tutorial \
--start-date 2015-06-01 \
--end-date 2015-06-07
Backfill은 DAG 실행에서 주요한 기능으로, 말 그대로 뒤에서 다시 채우는 것입니다. 예를 들어 6월 1일을 start date로 하고 6월 7일을 end date로 하는 DAG가 있다고 합시다. 이때 backfill을 사용하면 6월 2일 작업부터 6월 8일 작업까지 7번의 작업을 한 번에 실행할 수 있습니다. 데이터가 바뀌었는 등 다시 작업을 해야 하는 경우 유용합니다. 한 번에 여러 작업을 실행하는 것이기 때문에 잘못 설정하면 수많은 비용을 들일 수도 있어 주의해야 합니다.
Airflow Webserver를 통해 Backfill을 시각적으로 확인할 수 있습니다.
'Airflow' 카테고리의 다른 글
Airflow: 구조 개요 (0) | 2023.08.17 |
---|---|
Airflow: 데이터를 데이터베이스에 적재하고 주기적으로 갱신하는 파이프라인 만들기 (0) | 2023.08.02 |
Airflow: TaskFlow API의 여러 기능 (0) | 2023.07.23 |
Airflow: TaskFlow API 패러다임으로 DAG 정의 (0) | 2023.07.22 |
Airflow: 설치하기 (0) | 2023.06.26 |