일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- executor
- Scheduling
- airflow
- AWS
- kinesis
- dynamic task mapping
- boto3
- ci/cd
- Concept
- S3
- git book
- authoring
- RDS
- Git
- XCOM
- Operator
- DAG
- Task
- mysql
- lambda
- TaskFlow
- sqs
- testing
- celery
- credentials
- github actions
- dagrun
- SecretsManager
- pipeline
- Data Firehose
- Today
- Total
CS
Airflow: XCom & 변수 본문
XCom
XComs — Airflow Documentation
airflow.apache.org
XCom (cross-communication)은 task 간 데이터를 전달할 수 있는 메커니즘입니다. Airflow는 여러 기기에 걸쳐 실행될 수 있고, 이때 task는 다른 task와 완전히 독립적으로 실행되기 때문에 이런 메커니즘을 사용합니다.
XCom은 key (본질적으로 해당 XCom의 이름)와 해당 XCom이 발생한 task_id 및 dag_id로 식별합니다.
XCom은 (직렬화된) 값을 가지는데, 원칙적으로 작은 데이터만 가능합니다. 데이터프레임 같은 큰 데이터를 전달하는데 사용하지 않는 게 좋습니다.
XCom은 오퍼레이터나 task instance 클래스의 xcom_push와 xcom_pull을 사용하여 명시적으로 task instance에 있는 저장소에 "push"되고 "pull"됩니다.
많은 오퍼레이터가 실행 결과 값을 return_value를 key로 하여 자동적으로 push합니다(오퍼레이터 파라미터 do_xcom_push=True일 경우(기본 값)). @task 함수도 마찬가지입니다.
xcom_pull 메소드는 key가 전달되지 않으면 기본적으로 key=return_value로 동작합니다.
# 'pushing_task'라는 task의 결과 값을 가져옴
value = task_instance.xcom_pull(task_ids='pushing_task')
그리고 XCom은 템플릿으로도 사용할 수 있습니다.
# XCom으로 데이터베이스 테이블명을 넘김
SELECT * FROM {{ task_instance.xcom_pull(task_ids='my_task', key='my_table') }}
XCom은 Airflow의 변수 기능과 유사하다고 할 수 있습니다. 둘의 차이는 XCom은 task instance 단위로 작동하며 한 DAG Run에서 task 간 소통을 위해 만들어졌다면, 변수는 global하게 작동하며 Airflow 전체적으로 구성과 값을 전달하는 데 사용한다는 점입니다.
※ DAG의 첫 번째 task가 성공하지 않았을 경우, 해당 task를 재시도할 때마다 해당 DAG Run의 전체 XCom이 clear되어 task 멱등성을 유지합니다.
변수
Variables — Airflow Documentation
airflow.apache.org
변수는 런타임에 종속적인 구성(configuration) 같은 개념입니다.
키-값 쌍을 global로 저장하여, 여러 task에서 쿼리하여 사용할 수 있습니다.
변수는 Airflow web UI에서 쉽게 생성 가능하고, JSON 파일을 업로드하여 bulk로 생성할 수도 있습니다.
변수는 CLI의 환경 변수를 통해 생성할 수 있습니다.
환경 변수명 규칙은 'AIRFLOW_VAR_{변수명}'입니다. 이때 변수명은 대문자로 적어야 합니다.
# 키가 FOO고 값이 BAR인 변수 생성
export AIRFLOW_VAR_FOO=BAR
# 값에 JSON을 사용하려면 JSON 문자열 형식으로 저장함
export AIRFLOW_VAR_FOO_BAZ='{"hello":"world"}'
※ 환경 변수를 통해 변수를 생성할 때 밑줄("_")을 하나만 씁니다. 이는 airflow.cfg에 있는 구성 항목들을 설정할 때 밑줄을 두 개 쓰는 것과 구분됩니다.
※ 환경 변수를 통해 생성한 변수는 Airflow UI에 나타나지 않습니다. 그리고 이러한 변수는 Airflow UI를 통해 생성한 변수보다 우선합니다.
DAG를 정의할 때, Variable 모델을 임포트하여 get 메소드를 사용해 값을 불러올 수 있습니다.
from airflow.models import Variable
foo = Variable.get("foo")
# deserialize_json=True: JSON 값인 경우 역직렬화함
bar = Variable.get("bar", deserialize_json=True)
# default_var: 해당 키의 변수가 없을 경우 반환할 값
baz = Variable.get("baz", default_var=None)
변수는 템플릿의 var로도 사용할 수 있습니다.
# 보통 값인 경우
echo {{ var.value.<변수명> }}
# JSON 값인 경우 역직렬화함
echo {{ var.json.<변수명> }}
변수는 global하고, Airflow 구성 전체적으로 의미가 있도록 사용해야 합니다. 오퍼레이터/task 간 데이터 전달을 위해서는 XCom을 사용하세요.
또한 DAG 파일이 필요한 구성 값 대부분을 포함하도록 만드는 게 좋습니다. 이렇게 하면 버전 관리가 쉬워집니다. 변수는 런타임에 종속적인 값에만 사용하세요.
'Airflow' 카테고리의 다른 글
Airflow: 동적 Task 매핑 (0) | 2023.09.22 |
---|---|
Airflow: Param (파라미터) (0) | 2023.09.19 |
Airflow: Executor - (2) Celery Executor (0) | 2023.09.12 |
Airflow: Executor - (1) 개요 및 로컬 Executor (0) | 2023.09.12 |
Airflow: Celery Executor를 사용하여 클러스터로 DAG 실행하기 (0) | 2023.09.05 |