관리 메뉴

CS

Airflow: Operator 본문

Airflow

Airflow: Operator

vcxz 2023. 8. 26. 16:00

Airflow Operators, Airflow Sensors

 

  • Operator
  • Sensor
  • Jinja 템플릿
    • 파이썬 내장 객체로 렌더링하기

 


Operator

오퍼레이터는 사전에 정의된 Task 템플릿입니다. DAG에서 오퍼레이터를 선언하여 task를 만들 수 있습니다.

※ Task와 오퍼레이터는 대부분 상호 교환 가능한 개념입니다. 다만 Task를 말할 때는 DAG의 "실행 단위"를 뜻하고, 오퍼레이터를 말할 때는 재사용 가능한 미리 만들어진 task 템플릿을 가리킵니다.

with DAG("my-dag") as dag:
    ping = SimpleHttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="admin@example.com", subject="Update complete")

    ping >> email

Airflow에는 많은 오퍼레이터가 있습니다. 그중에는 Airflow 내장도 있고 미리 준비된 provider용(MySQL 같은)도 있습니다. bash 명령어를 실행하는 BashOperator, 파이썬 함수를 실행하는 PythonOperator가 대표적입니다.

@task는 보통 오퍼레이터와 다르게 인수로 입력된 jinja 템플릿을 렌더링할 수 없습니다.

 

코어에 내장된 오퍼레이터 목록은 Operators and Hooks Reference에서 볼 수 있습니다.

필요로 하는 오퍼레이터가 Airflow에 기본적으로 포함되어 있지 않다면, provider 패키지를 찾아볼 수 있습니다. 각종 DBMS나 AWS 같은 클라우드 관련 오퍼레이터 등이 있습니다. 전체 목록은 여기서 볼 수 있습니다.

 

Sensor

센서는 특수한 오퍼레이터입니다.

센서는 '어떤 것이 일어나기를 기다리는' 작업만 수행할 수 있습니다. 특정 시간까지 기다리거나, 어떤 파일이 존재할 때까지 기다리거나, 외부 이벤트를 기다릴 수 있습니다. 해당 사건이 일어나면 센서는 success하고 downstream task가 실행되게 됩니다.

 

센서는 기다리는 작업을 수행하기 때문에 주로 idle 상태입니다. 이에 두 가지 실행 모드를 제공하여 보다 효율적으로 사용할 수 있도록 하고 있습니다.

- poke (기본 값): 센서는 실행 시간 동안 계속 worker 슬롯을 차지합니다.

- reschedule: 센서는 어떤 일이 일어났는지 검사(poke라고도 함)할 때만 worker 슬롯을 차지하고, 다음 검사까지 sleep합니다.

 

모드는 센서의 파라미터 mode에 문자열로 넣으면 됩니다. 두 모드 간 트레이드오프는 지연 시간입니다. 매초마다 검사해야 한다면 poke 모드가, 매분 검사해야 한다면 reschedule 모드가 좋을 것입니다.

 

오퍼레이터처럼, Airflow는 내장 및 provider을 통해 다양한 목적을 위한 여러 센서를 미리 준비하고 있습니다.

 

Jinja 템플릿

Airflow는 Jinja 템플릿을 이용하며, macro 기능을 통해 보다 다양한 활용이 가능합니다.

예를 들어 BashOperator에서 환경 변수로 data interval의 시작점을 사용하고 싶다면 다음처럼 할 수 있습니다.

# YYYY-MM-DD 형식으로 data interval 시작점(str) 얻기
date = "{{ ds }}"
t = BashOperator(
    task_id="test_env",
    bash_command="/tmp/test.sh ",
    dag=dag,
    env={"DATA_INTERVAL_START": date},
)

{{ ds }}는 data interval의 시작점을 'YYYY-MM-DD' 형식의 문자열로 나타낸 템플릿 변수입니다.

BashOperatorenv는 Jinja 템플릿을 사용할 수 있는 파라미터이므로, 여기에 {{ ds }}를 넣어 환경 변수 DATA_INTERVAL_START에 해당 값을 입력할 수 있습니다.

 

Jinja 템플릿은 문서에 "templated"라고 기재된 파라미터에만 사용할 수 있습니다. 오퍼레이터의 pre_execute 함수가 호출되기 직전에 템플릿 치환이 일어납니다.

 

중첩된(nested) 필드의 경우, 필드가 속한 구조체에서 해당 필드가 template_fileds property에 등록되어 있다면 로 표시되어 있다면 Jinja 템플릿을 사용할 수 있습니다.

class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # ...

t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
    dag=dag,
)

MyDataReader 클래스의 property template_fieldspath 필드를 등록하여, path에 Jinja 템플릿을 사용할 수 있습니다.

template_fields는 클래스 변수이고, 리스트나 문자열 튜플 같은 Sequence[str]을 받습니다.

 

여러 번 중첩된 필드도 Jinja 템플릿을 사용 가능한데, 이 경우 중간 필드 역시 template_fields에 등록되어야 합니다.

class MyDataTransformer:
    template_fields: Sequence[str] = ("reader",)

    def __init__(self, my_reader):
        self.reader = my_reader

    # ...


class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # ...


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
    dag=dag,
)

MyDataReaderpath에 Jinja 템플릿을 사용하였고, 이 클래스의 결과물을 받는 MyDataTransformerreader에도 Jinja 템플릿을 받을 수 있도록 만들었습니다.

 

DAG를 만들 때, Jinja 설정인 Environment를 구성할 수 있습니다.

자주 사용되는 옵션은 Jinja가 끝에 오는 개행을 제거하지 않도록 하는 것입니다.

my_dag = DAG(
    dag_id="my-dag",
    jinja_environment_kwargs={
        "keep_trailing_newline": True,
    },
)

가능한 옵션은 Jinja 문서에서 확인하세요.

파이썬 내장 객체로 렌더링하기

기본적으로 모든 template_fields는 문자열로 렌더링됩니다.

예를 들어 extract라는 task가 {"1001": 301.27}이라는 딕셔너리를 XCom 테이블에 push할 경우, 다른 task에서 이를 pull해 Jinja로 렌더링하면 '{"1001": 301.27}'처럼 문자열이 됩니다.

transform = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
    python_callable=transform,
)

문자열 대신 딕셔너리 같은 파이썬 내장 객체로 렌더링하고 싶다면, DAG에 render_template_as_native_obj=True를 주어야 합니다.

dag = DAG(
    dag_id="example_template_as_python_object",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    # 파이썬 내장 객체로 렌더링
    render_template_as_native_obj=True,
)


@task(task_id="extract")
def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    # 문자열을 json으로 로드(이 경우 dict가 됨)
    return json.loads(data_string)


@task(task_id="transform")
def transform(order_data):
    print(type(order_data))
    for value in order_data.values():
        total_order_value += value
    return {"total_order_value": total_order_value}


extract_task = extract()

transform_task = PythonOperator(
    task_id="transform",
    # dict로 불러와짐
    op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
    python_callable=transform,
)

extract_task >> transform_task

이는 Jinja의 Environment 대신 NativeEnvironment를 사용하는 것입니다.

 

 

 

'Airflow' 카테고리의 다른 글

Airflow: Celery Executor를 사용하여 클러스터로 DAG 실행하기  (0) 2023.09.05
Airflow: TaskFlow  (0) 2023.08.31
Airflow: Task  (0) 2023.08.24
Airflow: DAG Run  (0) 2023.08.21
Airflow: DAG  (0) 2023.08.21