관리 메뉴

CS

Airflow: TaskFlow API 패러다임으로 DAG 정의 본문

Airflow

Airflow: TaskFlow API 패러다임으로 DAG 정의

vcxz 2023. 7. 22. 12:10

 

Airflow 2.0부터 TaskFlow API가 도입되어 해당 방식으로 DAG를 정의할 수 있습니다.

TaskFlow API를 사용하면 DAG나 오퍼레이터 객체를 생성할 필요 없이, Task를 구현한 함수에 필요한 데코레이터만 붙이면 DAG를 만들 수 있어 문법이 간단해집니다.

(Airflow 페이지의 Working with TaskFlow 문서를 따릅니다.)

 

 


TaskFlow로 정의된 DAG

예제 스크립트를 보며 TaskFlow API의 형식을 알아보겠습니다.

스크립트는 airflow.example_dags.tutorial_taskflow_api.py입니다.

DAG 인스턴스 생성

import json
import pendulum
from airflow.decorators import dag, task

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():

 

라이브러리에서 필요한 데코레이터를 임포트하고, DAG로 사용할 함수에 dag 데코레이터를 답니다.

DAG 객체를 생성할 때처럼 dag 데코레이터를 설정할 수 있습니다. dag_id를 따로 정하지 않으면 기본적으로 함수 이름이 DAG ID가 됩니다.

여기서는 DAG 함수 안에 Task 함수를 두게 됩니다.

Task 생성

    @task()
    def extract():
        """
        #### Extract task
        JSON 문자열을 하드코딩하는 것으로 대신하는 데이터 추출 Task
        """
        
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        order_data_dict = json.loads(data_string)
        
        return order_data_dict
        
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        받은 데이터의 값을 합하는 데이터 변환 Task
        """
        
        total_order_value = 0
        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
        
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        데이터를 출력하는 것으로 대신하는 데이터 적재 Task
        """

        print(f"Total order value is: {total_order_value:.2f}")

각각 데이터를 E, T, L하는 Task들을 만듭니다.

수행할 작업이 담긴 함수에 task 데코레이터를 답니다. 이는 실행 시 파이썬 오퍼레이터에 해당 함수를 넣은 것과 동일하게 작동합니다.

기존 방법에서 Task 객체를 정의할 때처럼 task 데코레이터의 인자를 넣어 설정할 수 있습니다. task_id를 따로 정하지 않으면 기본적으로 함수 이름이 Task ID가 됩니다.

extract()의 결과가 transform()에서, transform()의 결과가 load()에서 쓰입니다.

transform()에서 multiple_outputs=True로 하였는데, 주로 Task가 dict형 값을 반환하는 경우 사용하는 설정으로 활성화하면 dict를 쪼개 여러 개의 XCom 변수를 생성합니다. dict의 key가 XCom key가 됩니다. 참고로 Task가 dict형 값을 반환하는 경우 multiple_outputs는 자동적으로 활성화됩니다.

DAG 플로우 정의

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])
    
tutorial_taskflow_api()

함수로 정의한 Task를 실행하는 단계입니다.

TaskFlow에서는 Task 간 데이터 전달에서 XCom을 명시할 필요 없이, 함수를 호출하면 반환 값을 가져옵니다. 이는 Task가 여러 노드의 여러 worker에서 실행되는 경우도 마찬가지이므로 훨씬 편리합니다.

함수 수행 순서를 보면 extract() >> transfrom() >> load() 로 실행될 것을 알 수 있습니다.

그리고 마지막으로 dag 함수를 실행시켜 이 스크립트가 DAG로서 작동할 수 있도록 만듭니다.

 

 


기존 방식으로 정의된 DAG

위와 같은 작업을 수행하지만 기존 방식으로 정의한 DAG의 일부를 보며 두 형식을 비교해보겠습니다.

스크립트는 airflow.example_dags.tutorial_dag.py입니다.

Task 간 데이터 전달

def transform(**kwargs):
    ti = kwargs["ti"]
    extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
    order_data = json.loads(extract_data_string)

    total_order_value = 0
    for value in order_data.values():
        total_order_value += value

    total_value = {"total_order_value": total_order_value}
    total_value_json_string = json.dumps(total_value)
    ti.xcom_push("total_order_value", total_value_json_string)

변환 Task 함수입니다.

context의 task instance에서 XCom 변수를 pull하여 추출 Task의 결과를 가져오는 것이 명시되어 있습니다.

또한 본 Task의 결과를 다시 task instance에 XCom 변수로 push해 이후 적재 Task에서 쓰도록 합니다.

TaskFlow 방식에서도 내부적으로는 이와 동일하게 XCom 변수 전달이 일어나지만 추상화되어 겉으로 드러나지 않습니다.

만약 TaskFlow를 쓸 때 디버그나 모니터링이 필요하다면, Airflow UI를 통해 XCom 변수를 확인할 수 있습니다.

Task 종속성 정의

transform_task = PythonOperator(
    task_id="transform",
    python_callable=transform,
)
extract_task >> transform_task >> load_task

기존 방식에서는 함수를 오퍼레이터에 넣어 오퍼레이터 객체를 만들고 이를 사용해 Task 종속성을 따로 정의해야 합니다.

반면 TaskFlow 방식에서는 Task 함수를 작동시키기만 하면 자동적으로 종속성을 생성합니다.