관리 메뉴

CS

Airflow: XCom & 변수 본문

Airflow

Airflow: XCom & 변수

vcxz 2023. 9. 14. 20:18

XCom

 

XComs — Airflow Documentation

 

airflow.apache.org

XCom (cross-communication)은 task 간 데이터를 전달할 수 있는 메커니즘입니다. Airflow는 여러 기기에 걸쳐 실행될 수 있고, 이때 task는 다른 task와 완전히 독립적으로 실행되기 때문에 이런 메커니즘을 사용합니다.

 

XCom은 key (본질적으로 해당 XCom의 이름)와 해당 XCom이 발생한 task_iddag_id로 식별합니다.

XCom은 (직렬화된) 값을 가지는데, 원칙적으로 작은 데이터만 가능합니다. 데이터프레임 같은 큰 데이터를 전달하는데 사용하지 않는 게 좋습니다.

 

XCom은 오퍼레이터나 task instance 클래스의 xcom_pushxcom_pull을 사용하여 명시적으로 task instance에 있는 저장소에 "push"되고 "pull"됩니다.

많은 오퍼레이터가 실행 결과 값을 return_value를 key로 하여 자동적으로 push합니다(오퍼레이터 파라미터 do_xcom_push=True일 경우(기본 값)). @task 함수도 마찬가지입니다.

 

xcom_pull 메소드는 key가 전달되지 않으면 기본적으로 key=return_value로 동작합니다.

# 'pushing_task'라는 task의 결과 값을 가져옴
value = task_instance.xcom_pull(task_ids='pushing_task')

그리고 XCom은 템플릿으로도 사용할 수 있습니다.

# XCom으로 데이터베이스 테이블명을 넘김
SELECT * FROM {{ task_instance.xcom_pull(task_ids='my_task', key='my_table') }}

XCom은 Airflow의 변수 기능과 유사하다고 할 수 있습니다. 둘의 차이는 XCom은 task instance 단위로 작동하며 한 DAG Run에서 task 간 소통을 위해 만들어졌다면, 변수는 global하게 작동하며 Airflow 전체적으로 구성과 값을 전달하는 데 사용한다는 점입니다.

 

※ DAG의 첫 번째 task가 성공하지 않았을 경우, 해당 task를 재시도할 때마다 해당 DAG Run의 전체 XCom이 clear되어 task 멱등성을 유지합니다.

 

 


변수

 

Variables — Airflow Documentation

 

airflow.apache.org

변수는 런타임에 종속적인 구성(configuration) 같은 개념입니다.

키-값 쌍을 global로 저장하여, 여러 task에서 쿼리하여 사용할 수 있습니다.

변수는 Airflow web UI에서 쉽게 생성 가능하고, JSON 파일을 업로드하여 bulk로 생성할 수도 있습니다.

Airflow UI > Admin > Variables

변수는 CLI의 환경 변수를 통해 생성할 수 있습니다.

환경 변수명 규칙은 'AIRFLOW_VAR_{변수명}'입니다. 이때 변수명은 대문자로 적어야 합니다.

# 키가 FOO고 값이 BAR인 변수 생성
export AIRFLOW_VAR_FOO=BAR

# 값에 JSON을 사용하려면 JSON 문자열 형식으로 저장함
export AIRFLOW_VAR_FOO_BAZ='{"hello":"world"}'

※ 환경 변수를 통해 변수를 생성할 때 밑줄("_")을 하나만 씁니다. 이는 airflow.cfg에 있는 구성 항목들을 설정할 때 밑줄을 두 개 쓰는 것과 구분됩니다.

※ 환경 변수를 통해 생성한 변수는 Airflow UI에 나타나지 않습니다. 그리고 이러한 변수는 Airflow UI를 통해 생성한 변수보다 우선합니다.

 

DAG를 정의할 때, Variable 모델을 임포트하여 get 메소드를 사용해 값을 불러올 수 있습니다.

from airflow.models import Variable

foo = Variable.get("foo")
# deserialize_json=True: JSON 값인 경우 역직렬화함
bar = Variable.get("bar", deserialize_json=True)
# default_var: 해당 키의 변수가 없을 경우 반환할 값
baz = Variable.get("baz", default_var=None)

변수는 템플릿의 var로도 사용할 수 있습니다.

# 보통 값인 경우
echo {{ var.value.<변수명> }}
# JSON 값인 경우 역직렬화함
echo {{ var.json.<변수명> }}

변수는 global하고, Airflow 구성 전체적으로 의미가 있도록 사용해야 합니다. 오퍼레이터/task 간 데이터 전달을 위해서는 XCom을 사용하세요.

 

또한 DAG 파일이 필요한 구성 값 대부분을 포함하도록 만드는 게 좋습니다. 이렇게 하면 버전 관리가 쉬워집니다. 변수는 런타임에 종속적인 값에만 사용하세요.