일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- airflow
- dynamic task mapping
- executor
- XCOM
- authoring
- mysql
- SecretsManager
- Operator
- dagrun
- ci/cd
- Scheduling
- Concept
- testing
- sqs
- kinesis
- TaskFlow
- pipeline
- git book
- DAG
- credentials
- github actions
- S3
- boto3
- Git
- AWS
- Data Firehose
- celery
- RDS
- lambda
- Task
- Today
- Total
CS
Airflow: TaskFlow API 패러다임으로 DAG 정의 본문
Airflow 2.0부터 TaskFlow API가 도입되어 해당 방식으로 DAG를 정의할 수 있습니다.
TaskFlow API를 사용하면 DAG나 오퍼레이터 객체를 생성할 필요 없이, Task를 구현한 함수에 필요한 데코레이터만 붙이면 DAG를 만들 수 있어 문법이 간단해집니다.
(Airflow 페이지의 Working with TaskFlow 문서를 따릅니다.)
TaskFlow로 정의된 DAG
예제 스크립트를 보며 TaskFlow API의 형식을 알아보겠습니다.
스크립트는 airflow.example_dags.tutorial_taskflow_api.py입니다.
DAG 인스턴스 생성
import json
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
라이브러리에서 필요한 데코레이터를 임포트하고, DAG로 사용할 함수에 dag 데코레이터를 답니다.
DAG 객체를 생성할 때처럼 dag 데코레이터를 설정할 수 있습니다. dag_id를 따로 정하지 않으면 기본적으로 함수 이름이 DAG ID가 됩니다.
여기서는 DAG 함수 안에 Task 함수를 두게 됩니다.
Task 생성
@task()
def extract():
"""
#### Extract task
JSON 문자열을 하드코딩하는 것으로 대신하는 데이터 추출 Task
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
받은 데이터의 값을 합하는 데이터 변환 Task
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
"""
#### Load task
데이터를 출력하는 것으로 대신하는 데이터 적재 Task
"""
print(f"Total order value is: {total_order_value:.2f}")
각각 데이터를 E, T, L하는 Task들을 만듭니다.
수행할 작업이 담긴 함수에 task 데코레이터를 답니다. 이는 실행 시 파이썬 오퍼레이터에 해당 함수를 넣은 것과 동일하게 작동합니다.
기존 방법에서 Task 객체를 정의할 때처럼 task 데코레이터의 인자를 넣어 설정할 수 있습니다. task_id를 따로 정하지 않으면 기본적으로 함수 이름이 Task ID가 됩니다.
extract()의 결과가 transform()에서, transform()의 결과가 load()에서 쓰입니다.
transform()에서 multiple_outputs=True로 하였는데, 주로 Task가 dict형 값을 반환하는 경우 사용하는 설정으로 활성화하면 dict를 쪼개 여러 개의 XCom 변수를 생성합니다. dict의 key가 XCom key가 됩니다. 참고로 Task가 dict형 값을 반환하는 경우 multiple_outputs는 자동적으로 활성화됩니다.
DAG 플로우 정의
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_taskflow_api()
함수로 정의한 Task를 실행하는 단계입니다.
TaskFlow에서는 Task 간 데이터 전달에서 XCom을 명시할 필요 없이, 함수를 호출하면 반환 값을 가져옵니다. 이는 Task가 여러 노드의 여러 worker에서 실행되는 경우도 마찬가지이므로 훨씬 편리합니다.
함수 수행 순서를 보면 extract() >> transfrom() >> load() 로 실행될 것을 알 수 있습니다.
그리고 마지막으로 dag 함수를 실행시켜 이 스크립트가 DAG로서 작동할 수 있도록 만듭니다.
기존 방식으로 정의된 DAG
위와 같은 작업을 수행하지만 기존 방식으로 정의한 DAG의 일부를 보며 두 형식을 비교해보겠습니다.
스크립트는 airflow.example_dags.tutorial_dag.py입니다.
Task 간 데이터 전달
def transform(**kwargs):
ti = kwargs["ti"]
extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
order_data = json.loads(extract_data_string)
total_order_value = 0
for value in order_data.values():
total_order_value += value
total_value = {"total_order_value": total_order_value}
total_value_json_string = json.dumps(total_value)
ti.xcom_push("total_order_value", total_value_json_string)
변환 Task 함수입니다.
context의 task instance에서 XCom 변수를 pull하여 추출 Task의 결과를 가져오는 것이 명시되어 있습니다.
또한 본 Task의 결과를 다시 task instance에 XCom 변수로 push해 이후 적재 Task에서 쓰도록 합니다.
TaskFlow 방식에서도 내부적으로는 이와 동일하게 XCom 변수 전달이 일어나지만 추상화되어 겉으로 드러나지 않습니다.
만약 TaskFlow를 쓸 때 디버그나 모니터링이 필요하다면, Airflow UI를 통해 XCom 변수를 확인할 수 있습니다.
Task 종속성 정의
transform_task = PythonOperator(
task_id="transform",
python_callable=transform,
)
extract_task >> transform_task >> load_task
기존 방식에서는 함수를 오퍼레이터에 넣어 오퍼레이터 객체를 만들고 이를 사용해 Task 종속성을 따로 정의해야 합니다.
반면 TaskFlow 방식에서는 Task 함수를 작동시키기만 하면 자동적으로 종속성을 생성합니다.
'Airflow' 카테고리의 다른 글
Airflow: 구조 개요 (0) | 2023.08.17 |
---|---|
Airflow: 데이터를 데이터베이스에 적재하고 주기적으로 갱신하는 파이프라인 만들기 (0) | 2023.08.02 |
Airflow: TaskFlow API의 여러 기능 (0) | 2023.07.23 |
Airflow: DAG 형식 살펴보기 (0) | 2023.06.27 |
Airflow: 설치하기 (0) | 2023.06.26 |