Airflow

Airflow: 동적 Task 매핑

vcxz 2023. 9. 22. 19:55

Airflow Dynamic Task Mapping

 

Dynamic Task Mapping — Airflow Documentation

 

airflow.apache.org

Astronomer Create dynamic Airflow tasks

 

Create dynamic Airflow tasks | Astronomer Documentation

How to dynamically create tasks at runtime in your Airflow DAGs.

docs.astronomer.io

 


동적 Task 매핑

"동적"이라는 단어에서 예측할 수 있듯이, DAG 작성자가 DAG 파일을 만드는 시점에 task 내용을 완전히 결정하는 게 아니라, DAG 런타임에 task가 완성되도록 하는 것을 동적 task 매핑이라고 합니다.

동적 task 매핑을 사용하면 런타임에 현재 데이터를 바탕으로 여러 task를 만들어 내 실행할 수 있습니다. DAG 작성 당시에는 알 수 없는 사항(처리해야 할 파일 개수 등)이 있을 시 사용할 수 있습니다.

 

동적 task 매핑은 for 반복문에 task를 넣는 것과 비슷하게 작동하는데, for 반복문을 쓸 경우 작동은 온전히 코드의 몫이지만, 동적 task 매핑을 쓸 경우 scheduler가 upstream task의 반환 값에 따라 처리한다는 점이 다릅니다. 매핑된 task의 실행 차례 직전, scheduler는 각 매핑 인수에 따라 여러 task를 생성합니다.

 

기본 매핑

DAG 파일에서 task의 expand 메소드를 사용하여 list를 인수로 매핑할 수 있습니다.

from datetime import datetime
from airflow import DAG
from airflow.decorators import task

with DAG(dag_id="example_dynamic_task_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def sum_it(values):
        total = sum(values)
        print(f"Total was {total}")

    added_values = add_one.expand(x=[1, 2, 3])
    sum_it(added_values)

런타임에 add_oneexpand의 키워드 인수 각각에 대해 생성됩니다. 즉 add_one(x=1), add_one(x=2), add_one(x=3)이 실행됩니다. 그리고 add_one의 반환 값인 added_values에는 각 add_one의 반환 값이 list 형태로 들어갑니다. 즉 [2, 3, 4]가 됩니다. 이것이 sum_it에 입력되어 결과적으로 "Total was 9"를 출력합니다.

expand에는 키워드 인수만 써야 합니다.

※ 예시에서는 매핑 task에 sum_it이라는 "reduce" 격인 downstream task가 있지만, 꼭 downstream task가 있어야 되는 건 아닙니다.

Airflow UI add_one 그리드 정보

UI에서 매핑 task는 각각 표시되는 것이 아니라 task 이름 옆에 "[ ]"를 붙여 하나의 task로서 표현됩니다. 각 인스턴스에 대한 정보는 task 정보에서 map index로 구분되어 나타납니다. map index는 입력 값을 기재한 순서를 반드시 따르진 않습니다.

 

※ 매핑 task의 반환 값은 lazy(호출 시 로딩) proxy(가 객체)입니다. 위 예시에서, 매핑 task의 특성 상 add_one이 실행되기 전에는 몇 개의 add_one 인스턴스가 생성될지 알 수 없으므로, add_one의 반환 값 타입은 list가 아니라 그것이 호출될 때 값을 받는 lazy 시퀀스입니다. 이는 LazyXComAccess라는 객체로 표현됩니다.

 

이 객체에 인덱싱 같은 시퀀스 문법을 사용할 수 있고, for 반복문으로 반복할 수 있습니다. 이 객체에 list()를 사용하면 진짜 list로 만들 수 있습니다. 다만 그러면 lazy가 아니라 eager(즉시 로딩)로 작동하기 때문에, 성능적 문제가 없을지 고려해야 합니다.

 

이는 XCom에 proxy를 push할 때도 마찬가지입니다. Airflow는 자동적으로 잘 처리하려고 하지만, 이를 알리기 위해 경고를 출력할 것입니다.

@task
def forward_values(values):
    return values  # lazy proxy

# 경고 출력
"""
Coercing mapped lazy proxy return value from task forward_values to list, which may degrade
performance. Review resource requirements for this operation, and call list() explicitly to suppress this message.
See Dynamic Task Mapping documentation for more information about lazy proxy objects.
"""

# 경고를 띄우지 않으려면 proxy로 내보내지 말고 list로 만들어 내보내야 함
@task
def forward_values(values):
    return list(values)

매핑으로 task 생성하기

위 예시는 DAG 파일에 for 반복문을 쓰는 것과 별다른 차이가 없으나, 동적 task 매핑의 유용성은 task를 반복(iterate)할 수 있다는 점에 있습니다.

@task
def make_list():
    # list나 dict를 반환하는 task면 무엇이든 활용 가능
    return [1, 2, {"a": "b"}, "str"]

@task
def consumer(arg):
    print(arg)

with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

make_list의 반환 값이 원소가 4개인 list이므로, consumer는 입력 값별로 실행되어 총 4번 실행됩니다.

 

반복 매핑하기

매핑 task의 반환 값을 다시 매핑 task의 입력 값으로 사용할 수 있습니다.

with DAG(dag_id="repeated_mapping", start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    first = add_one.expand(x=[1, 2, 3])
    second = add_one.expand(x=first)

매핑된 add_one의 반환 값을 다시 add_one에 매핑합니다. 결과 값은 [3, 4, 5]가 될 것입니다.

 

매핑하지 않을 인수 입력하기

매핑 task의 일부 인수는 항상 동일하게 유지할 수 있습니다. task의 partial 메소드를 사용하면 됩니다.

partial에는 키워드 인수만 써야 합니다.

@task
def add(x: int, y: int):
    return x + y

added_values = add.partial(y=10).expand(x=[1, 2, 3])
# 다음 인스턴스가 만들어짐
# add(x=1, y=10)
# add(x=2, y=10)
# add(x=3, y=10)

y 값은 항상 10이므로, 결과는 [11, 12, 13]일 것입니다.

이 방법은 커넥션 ID, 데이터베이스 테이블, bucket 이름 등을 쓸 때 유용합니다.

 

여러 인수 매핑하기

여러 인수를 동시에 매핑할 수 있습니다. 이를 수행하면 "벡터곱(외적)"을 만들게 됩니다.

@task
def add(x: int, y: int):
    return x + y

added_values = add.expand(x=[2, 4, 8], y=[5, 10])
# 다음 인스턴스가 생성됨
# add(x=2, y=5)
# add(x=2, y=10)
# add(x=4, y=5)
# add(x=4, y=10)
# add(x=8, y=5)
# add(x=8, y=10)

인스턴스 생성 순서가 반드시 왼쪽에서부터인 것은 아님을 유의해야 합니다.

 

 

TaskFlow가 아닌 경우의 매핑

TaskFlow가 아닌 기존 오퍼레이터에도 partialexpand를 사용할 수 있습니다. task_id, queue, pool과 같은 오퍼레이터에서 항상 동일할 필요가 있는 인수를 partial에 입력할 수 있습니다.

from datetime import datetime
from airflow import DAG
from airflow.models.baseoperator import BaseOperator

class AddOneOperator(BaseOperator):
    """입력 값에 1을 더하는 오퍼레이터"""

    def __init__(self, value, **kwargs):
        super().__init__(**kwargs)
        self.value = value

    def execute(self, context):
        return self.value + 1

class SumItOperator(BaseOperator):
    """입력 값을 합하는 오퍼레이터"""

    template_fields = ("values",)

    def __init__(self, values, **kwargs):
        super().__init__(**kwargs)
        self.values = values

    def execute(self, context):
        total = sum(self.values)
        print(f"Total was {total}")
        return total

with DAG(
    dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
    start_date=datetime(2022, 3, 4),
    catchup=False,
):
    # task에 list 매핑
    add_one_task = AddOneOperator.partial(task_id="add_one").expand(value=[1, 2, 3])

    # 매핑 task 반환 값을 "reduce"
    # ".output"을 붙이는 것을 유의
    sum_it_task = SumItOperator(task_id="sum_it", values=add_one_task.output)

기존 오퍼레이터의 반환 값으로 매핑하기

기존 오퍼레이터의 반환 값으로 매핑할 때는 output 속성을 명시해야 합니다.

extract = ExtractOperator(task_id="extract")
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)
load = LoadOperator.partial(task_id="load").expand(input=transform.output)

TaskFlow와 기존 오퍼레이터 함께 사용하기

아래는 S3 bucket에 도착한 데이터에 모두 같은 처리를 하는 DAG 예시입니다.

from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator

with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
    list_filenames = S3ListOperator(
        task_id="get_input",
        bucket="example-bucket",
        prefix='incoming/provider_a/{{ data_interval_start.strftime("%Y-%m-%d") }}',
    )

    @task
    def count_lines(aws_conn_id, bucket, filename):
        hook = S3Hook(aws_conn_id=aws_conn_id)
        
        return len(hook.read_key(filename, bucket).splitlines())

    @task
    def total(lines):
        return sum(lines)

    counts = count_lines.partial(aws_conn_id="aws_default", bucket=list_filenames.bucket).expand(
        filename=list_filenames.output
    )

    total(lines=counts)

list_filenames의 반환 값별로 count_lines를 실행하고, 모든 count_lines의 반환 값을 total로 "reduce"합니다.

 

키워드 인수를 사용하여 매핑하기

upstream task가 downstream task에 여러 인수를 명시하여 보내야 할 때가 있는데, 이때 task의 expand_kwargs 메소드를 사용할 수 있습니다.

BashOperator.partial(task_id="bash").expand_kwargs(
    [
        {"bash_command": "echo $ENV1", "env": {"ENV1": "1"}},
        {"bash_command": "printf $ENV2", "env": {"ENV2": "2"}},
    ],
)

이는 런타임에 두 task instance를 만들며 각각 1과 2를 출력합니다.

 

expand와 비슷하게, expand_kwargs는 dict를 갖는 list를 반환하는 XCom 또는 dict를 반환하는 XCom list도 매핑할 수 있습니다. 이를 사용해 위 S3 예시에 "branching" 과정을 추가하였습니다.

list_filenames = S3ListOperator(...)

@task
def create_copy_kwargs(filename):
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        dest_bucket_name = "my_text_bucket"
    else:
        dest_bucket_name = "my_other_bucket"
    # dict 반환
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": dest_bucket_name,
    }

# dict를 원소로 하는 list
copy_kwargs = create_copy_kwargs.expand(filename=list_filenames.output)

# 각 원소(dict)별로 오퍼레이터 실행
copy_filenames = S3CopyObjectOperator.partial(
    task_id="copy_files", source_bucket_name=list_filenames.bucket
).expand_kwargs(copy_kwargs)

 

 

Task group 매핑하기

@task_group 함수에 expandexpand_kwargs를 사용하여 task group을 매핑할 수 있습니다.

@task_group
def file_transforms(filename):
    return convert_to_yaml(filename)

file_transforms.expand(filename=["data1.json", "data2.json"])

런타임에 task group file_transforms에 있는 convert_to_yaml이 두 task instance로 생성됩니다. 첫 번째는 data1.json을 입력받고, 두 번째는 data2.json을 입력받게 됩니다.

※ 매핑된 task group 안에 다시 매핑 task를 넣는 것은 현재 불가능합니다.

 

Task group에서의 값 참조

일반 task와 task group의 중요한 차이 중 하나는 task group은 worker에 연결되어 있지 않다는 점으로, task group에 전달된 인수는 task group 내 task에 전달되기 전까지는 진짜 값을 갖지 않습니다. 즉, task group 안에 있는 task가 실행될 때야 값이 의미를 갖게 됩니다.

이에 따라 아래 코드는 의도에 부합하지 못합니다.

@task
def my_task(value):
    print(value)

@task_group
def my_group(value):
    if not value:  # 인수가 아직 task에 도달하지 않았으므로 진짜 값을 가지지 않음
        task_a = EmptyOperator(...)
    else:
        task_a = PythonOperator(...)
    task_a << my_task(value)  # 이때 value가 진짜 값을 가짐

my_group.expand(value=[0, 1, 2])

my_group이 실행될 때, value는 아직 참조일 뿐으로 진짜 값을 갖지 않습니다. 따라서 if 문은 의도대로 작동하지 않습니다. 이후 my_task가 실행될 때 value는 진짜 값(0, 1, 2)를 갖습니다.

 

그러므로 task group의 인수를 사용해 조건문을 쓰려면 @task.branch 같은 task를 사용하고, 반복문을 쓰려면 task 매핑을 사용해야 합니다.

 

깊이 우선 실행

매핑된 task group 내에 여러 task가 있는 경우, 한 인수(들)로 모든 task를 실행한 후 다음 인수(들)로 넘어갑니다.

# 1. task group으로 묶어 실행
@task_group
def file_transforms(filename):
    converted = convert_to_yaml(filename)
    return replace_defaults(converted)

file_transforms.expand(filename=["data1.json", "data2.json"])
"""
convert_to_yaml("data1.json") >> replace_defaults(converted_data1)
convert_to_yaml("data2.json") >> replace_defaults(converted_data2)
별개의 workflow로 수행됨
"""

# 2. 그냥 실행
converted = convert_to_yaml.expand(filename=["data1.json", "data2.json"])
replace_defaults.expand(filename=converted)
"""
[convert_to_yaml("data1.json"), convert_to_yaml("data2.json")] >> [replace_defaults(converted_data1), replace_defaults(converted_data2)]
하나의 workflow이므로, convert_to_yaml("data2.json")이 실패하면 replace_defaults(converted_data1)도 실행되지 않음
"""

 

두 경우 모두 convert_to_yamlreplace_defaults에서 각각 2개의 인스턴스가 생성된다는 점은 같습니다.

반면, 1은 각 인수별로 task group 내 task 전체가 실행된다는 점이 다릅니다. 이 전략을 깊이 우선 실행이라고 합니다. 반면 2는 너비 우선 실행이라고 할 수 있겠죠. 이를 잘 사용하면 좀 더 논리적인 task 분리, 정제된 의존성, 정확한 자원 할당을 할 수 있습니다.

 

매핑된 Task group의 반환 값

매핑 task와 마찬가지로, 매핑된 task group의 반환 값은 통합됩니다.

@task_group
def add_to(value):
    value = add_one(value)
    return double(value)

results = add_to.expand(value=[1, 2, 3])
consumer(results)  # [4, 6, 8]을 받음

 

 

일부 인수 스킵하기

동적 task 매핑을 할 때 특정 인수에 대해서는 task를 수행하지 않도록 정의하고 싶을 수 있습니다. 그러려면 인수로 None이 가도록 만들면 됩니다.

@task
def create_copy_kwargs(filename):
    # 확장자가 json이나 yml이 아닌 파일은 스킵
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        return None
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": "my_other_bucket",
    }

위 task의 반환 값으로 다른 task를 매핑하면, 확장자가 json이나 yml인 경우만 task를 수행할 수 있습니다.

 

 

map과 zip

map: 매핑을 위한 인수 변환

어떤 task의 반환 값으로 다른 task를 매핑하려 할 때, 값의 형태를 변환해야 할 수 있습니다. 특히 기존 오퍼레이터의 경우 반환 값이 정해져 있으므로 더욱 그렇습니다. 이때는 task나 오퍼레이터 outputmap 메소드를 사용하여 값을 변환할 수 있습니다.

from airflow.exceptions import AirflowSkipException

list_filenames = S3ListOperator(...)

# 값 변환 파이썬 함수
def create_copy_kwargs(filename):
    if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
        raise AirflowSkipException(f"skipping {filename!r}; unexpected suffix")
    return {
        "source_bucket_key": filename,
        "dest_bucket_key": filename,
        "dest_bucket_name": "my_other_bucket",
    }

# 오퍼레이터 반환 값에 map을 사용하여 함수를 통해 변환
copy_kwargs = list_filenames.output.map(create_copy_kwargs)

copy_filenames = S3CopyObjectOperator.partial(...).expand_kwargs(copy_kwargs)

map에 넣을 함수는 반드시 task가 아닌 일반 파이썬 함수여야 합니다. 변환은 별개의 task가 아니라 downstream task(여기서는 S3CopyOperator)의 전처리 과정에서 수행됩니다.

또한 함수는 정확히 하나의 위치 인수만 받아야 합니다. map은 파이썬 내장 map()과 유사하게 작동합니다.

함수는 downstream task의 일부로서 실행되기 때문에, task를 만들 때 사용하는 여러 기법을 포함할 수 있습니다. 특정 인수가 입력되는 경우 task를 스킵하고 싶다면 AirflowSkipException을 발생시킬 수 있습니다(여기서는 None을 반환하는 방법이 작동하지 않습니다.).

 

zip: upstream 반환 값 묶기

매핑 task의 인수로 여러 종류의 입력 값을 넣어야 할 수 있습니다. 이는 파이썬 내장 zip()과 유사한 zip 메소드를 통해 가능하며, downstream task의 전처리 과정에서 수행됩니다.

list_filenames_a = S3ListOperator(
    task_id="list_files_in_a",
    bucket="bucket",
    prefix="incoming/provider_a/{{ data_interval_start|ds }}",
)
list_filenames_b = ["rename_1", "rename_2", "rename_3", ...]

filenames_a_b = list_filenames_a.output.zip(list_filenames_b)

@task
def download_filea_from_a_rename(filenames_a_b):
    fn_a, fn_b = filenames_a_b
    S3Hook().download_file(fn_a, local_path=fn_b)

download_filea_from_a_rename.expand(filenames_a_b=filenames_a_b)

위는 S3의 파일을 이름을 변경하여 다운로드하는 예시입니다. list_filenames_a로 얻은 기존 파일명과 list_filenames_b의 값이 tuple로 묶여 각각 쌍을 이루어 download_filea_from_a_rename으로 전달됩니다.

 

zip은 위치 인수(들)을 받으며, 기본적으로 가장 원소가 적은 인수의 수만큼의 tuple의 iterable을 반환합니다. 초과된 원소들은 제외됩니다. zip의 키워드 인수 default에 값을 넣으면, 가장 원소가 많은 인수의 수만큼 tuple의 iterable을 반환하며, 부족한 원소들은 해당 값으로 채워집니다. 이는 파이썬의 itertools.zip_longest와 유사합니다.

 

매핑할 수 있는 자료형

현재는 dict, list 또는 dict나 list가 저장된 XCom으로만 매핑할 수 있습니다.

만약 매핑할 수 없는 자료형을 반환하는 upstream task로 매핑하면 런타임에 UnmappableXComTypePushed 예외가 발생하며 task는 실패합니다.

 

 

매핑에서 템플릿 사용?

오퍼레이터의 인수는 모두 매핑될 수 있지만 템플릿 값을 받지는 못합니다. templated된 속성도 마찬가지입니다.

@task
def make_list():
    return ["{{ ds }}"]

@task
def printer(val):
    print(val)

printer.expand(val=make_list())  # {{ ds }}

이를 보간하고 싶다면 task의 render_template 메소드를 사용하거나 직접 코드를 작성할 수 있습니다.

# context에서 ds를 받아 직접 반환하기
@task
def make_list(ds=None):
    return [ds]

# context에서 task 객체를 얻어 render_template 메소드 사용하기
@task
def make_list(**context):
    return [context["task"].render_template("{{ ds }}", context)]

 

 

매핑 task에 제한두기

두 제한을 둘 수 있습니다.

1. 매핑으로 생성되는 task instance 수 제한

- airflow.cfg[core]max_map_length로 매핑으로 생성되는 task instance 수를 통제할 수 있습니다. 기본 값은 1024입니다. 만약 upstream task가 매핑으로 그 수 이상의 downstream task를 만든다면, 해당 upstream task는 실패합니다.

 

2. 한 번에 실행되는 매핑 task instance 수 제한

- task의 max_active_tis_per_dag 속성으로 한 번에 실행되는 해당 task instance 수를 제어할 수 있습니다. 다만 이는 하나의 DagRun에 국한되지 않고 현재 실행되는 모든 DagRun에 있는 동일한 task에 적용되는 점에 유의해야 합니다.

 

 

매핑 인수가 비어 있을 경우 스킵

매핑 task에 들어가는 인수가 없을 경우(zero-length), task가 생성되지 않고 SKIPPED으로 표시됩니다.