Airflow

Airflow: 동적 DAG 생성

vcxz 2023. 10. 16. 18:18

Airflow Dynamic DAG Generation

 

Dynamic DAG Generation — Airflow Documentation

 

airflow.apache.org

 


For 반복문 등을 사용하면, 유사한 구조를 가진 여러 DAG를 DAG 하나하나마다 정의할 필요 없이 생성할 수 있습니다. 이를 동적 DAG 생성이라고 합니다. 이렇게 만들어지는 DAG들은 내부 task의 수가 기본적으로 같다는 점에서 동적 task 매핑과 구별됩니다.

 

 

환경 변수를 통한 DAG 생성

 

코드에서 변수를 사용하고 싶다면, top level에서는 Airflow 변수가 아닌 환경 변수를 써야 합니다. Airflow 변수를 top level에서 사용할 경우 메타데이터 DB에서 값을 가져오기 위하여 connection을 생성하기 때문에 파싱 속도를 늦추고 DB 부하를 늘리기 때문입니다.

 

deployment = os.environ.get("DEPLOYMENT", "PROD")
if deployment == "PROD":
    task = Operator(param="prod-param")
elif deployment == "DEV":
    task = Operator(param="dev-param")

프로덕션 환경과 개발 환경을 구분하여 사용한다면, DEPLOYMENT라는 환경 변수를 만들어 값으로 프로덕션인 경우 PROD, 개발이면 DEV를 주고, DAG 파일에서 이를 읽은 후 값에 따라 다른 DAG를 생성하도록 할 수 있습니다.

 

 

embeded 메타데이터를 통한 DAG 생성

 

DAG를 생성하는데 필요한 메타데이터를 담은 파일을 두고, DAG 파일에서 이를 임포트하여 DAG를 생성할 수 있습니다.

 

동적으로 생성되는 task에 대한 정보 ALL_TASKS가 담긴 파일 common.py가 있을 때 다음처럼 DAG를 생성할 수 있습니다.

from my_company_utils.common import ALL_TASKS

with DAG(
    dag_id="my_dag",
    schedule=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
):
    for task in ALL_TASKS:
        ...

이때 common.py가 있는 디렉토리인 my_company_utils에는 __init__.py가 있어야 합니다. 이는 파일이 모듈로서 사용될 수 있도록 하기 위함입니다.

또한 .airflowignore 파일에 my_company_utils/.*를 추가하여 scheduler가 이 파일을 확인하지 않도록 해야 합니다.

 

 

구조화된 데이터 파일을 통한 DAG 생성

 

DAG 생성을 위해 보다 복잡한 메타데이터가 필요한 경우 파이썬 파일이 아니라 JSON, YAML 같은 구조화된 데이터 파일로 보관할 수 있습니다.

 

데이터 파일은 DAG 파일이 있는 디렉토리와 같은 곳에 두는 게 찾기가 편합니다. 이 경우 다음 방법으로 위치를 찾고 불러올 수 있습니다.

my_dir = os.path.dirname(os.path.abspath(__file__))
configuration_file_path = os.path.join(my_dir, "config.yaml")
with open(configuration_file_path) as yaml_file:
    configuration = yaml.safe_load(yaml_file)

 

 

동적 DAG의 등록

 

@dag 데코레이터나 with DAG(..) 컨텍스트 매니저를 사용하면 Airflow가 자동적으로 DAG로 등록합니다.

from datetime import datetime
from airflow.decorators import dag, task

configs = {
    "config1": {"message": "first DAG will receive this message"},
    "config2": {"message": "second DAG will receive this message"},
}

for config_name, config in configs.items():
    dag_id = f"dynamic_generated_dag_{config_name}"

    @dag(dag_id=dag_id, start_date=datetime(2022, 2, 1))
    def dynamic_generated_dag():
        @task
        def print_message(message):
            print(message)

        print_message(config["message"])

    dynamic_generated_dag()

위 코드는 dynamic_generated_dag_config1dynamic_generated_dag_config2를 생성합니다. 두 DAG는 서로 다른 DAG로서 동작합니다.

 

 

DAG 파싱 최적화

 

하나의 파일로 많은 DAG를 동적으로 생성하는 경우, DAG가 트리거되어 task를 실행하기 전 일어나는 DAG 파싱으로 인해 task 실행이 늦춰질 수 있습니다.

 

scheduler(정확히는 DAG File Processor)는 DAG 파일을 파싱할 때 거기 있는 모든 DAG를 확인해야 할 것입니다. 그러나 task 실행만을 위해서는 그 task가 속한 하나의 DAG만 보면 됩니다. 따라서 task가 실행될 때는 다른 DAG는 무시하도록 하여 파싱 시간을 단축할 수 있습니다.

 

이 접근 방식은 여러 DAG가 서로 의존성을 가지는 상황 등에서는 사용할 수 없으며 부작용이 있을 수 있습니다.

from airflow.models.dag import DAG
from airflow.utils.dag_parsing_context import get_parsing_context

current_dag_id = get_parsing_context().dag_id

for thing in list_of_things:
    dag_id = f"generated_dag_{thing}"
    if current_dag_id is not None and current_dag_id != dag_id:
        continue  # current_dag_id가 아닌 DAG는 스킵

    with DAG(dag_id=dag_id, ...):
        ...

get_parsing_context를 사용하면 AirflowParsingContext에 접근할 수 있습니다. 해당 컨텍스트의 dag_idtask_id 필드는 DAG File Processor의 DAG 파일 전체 파싱 시에는 None 값을 갖는 반면, task 실행 직전 파싱 시에는 해당하는 DAG ID와 task ID 값을 가집니다. 따라서 위처럼 컨텍스트의 dag_id가 특정 DAG ID인 경우에만 DAG 생성 코드로 이어지게 만든다면 task 실행 전 파싱 시간을 줄일 수 있습니다.

 

 

예시

 

각각 주중에 실행되거나 주말에 실행되는 서로 유사한 DAG를 하나의 DAG 파일로 생성합니다.

from datetime import datetime
from airflow.decorators import dag, task
from airflow.utils.dag_parsing_context import get_parsing_context

# 각 DAG를 위한 메타데이터
configs = {
    "weekday": {"message": "Today is weekday.", "schedule": "0 0 * * 1-5"},
    "weekend": {"message": "Today is weekend.", "schedule": "0 0 * * 0,6"},
}

# 파싱 최적화를 위해 파싱 컨텍스트의 DAG ID 획득
current_dag_id = get_parsing_context().dag_id

# for 반복문을 사용해 동적으로 DAG 생성
for config_name, config in configs.items():
    dag_id = f"dynamic_dag_for_{config_name}"
    # 전체 파싱이 아닌 경우 필요 없는 DAG 스킵
    if current_dag_id is not None and current_dag_id != dag_id:
        continue

    @dag(dag_id=dag_id, start_date=datetime(2023, 10, 10), schedule=config["schedule"])
    def dynamic_dag():

        # DAG별로 다른 task
        task_id = f"{config_name} task"
        @task(task_id=task_id)
        def different_task(message):
            print(message)
        
        # 두 DAG 모두 동일한 task
        @task(task_id="always")
        def same_task():
            print("This is a task that will be always executed.")
        
        different_task(config["message"]) >> same_task()

    dynamic_dag()

 

Airflow UI에서의 모습