관리 메뉴

CS

Airflow: Task 본문

Airflow

Airflow: Task

vcxz 2023. 8. 24. 05:57

Airflow Tasks

 

  • Task 간 관계(의존성)
  • Task Instance
  • Timeout
  • SLA
  • Exception
  • 좀비/언데드 Task
  • Task별 Executor 구성

 


Task는 Airflow에서의 실행 기초 단위입니다.

task는 DAG에 포함되고, 실행 순서를 나타내기 위해 task 간 upstream/downstream 의존성을 갖습니다.

 

task에는 세 종류가 있습니다.

- Operator(오퍼레이터): 사전 정의된 task 템플릿으로, DAG의 대부분을 차지합니다.

- Sensor(센서): 오퍼레이터의 특별한 서브클래스로, 외부 이벤트가 일어날 때까지 기다립니다.

- TaskFlow 데코레이터 @task: 자작 파이썬 함수를 task로 만듭니다.

위 모두는 BaseOperator의 서브클래스입니다.

그리고 task와 오퍼레이터는 어떤 의미에서 상호 교환가능하지만, 구별된 개념으로 생각하는 게 유용합니다. 본질적으로 오퍼레이터와 센서는 템플릿이고 DAG 파일에서 그것들을 호출하면 task가 만들어 집니다.

 

Task 간 관계(의존성)

task를 사용할 때 task 간 관계를 설정하는 것이 중요합니다. 이 관계가 의존성인데, Airflow에서는 upstream/downstream task라고 합니다. 우선 task를 선언한 후 그것들의 의존성을 선언합니다.

※ upstream task는 다른 task의 바로 전에 있는 task를 말합니다. 예전에는 이를 parent task라고 불렀습니다. 바로 전에 있는 task만을 가리키므로 그것보다 더 전에 있는 task는 upstream task가 아닌 것을 유의해야 합니다. downstream도 마찬가지입니다.

 

의존성을 선언하는 방법은 두 가지입니다.

# >>와 << (bitshift) 오퍼레이터
first >> second >> [third_1, third_2]

# 좀 더 명백한 표현 set_upstream/downstream 메소드
first.set_downstream(second)
third.set_upstream(second)

둘 다 똑같은 작업을 수행하지만, 많은 경우 bitshift가 더 읽기 쉽습니다.

 

기본적으로 task는 upstream task가 모두 성공해야 실행됩니다. 다만 branching하거나, 특정 upstream task만 확인하거나, 기록상 이전/이후의 실행 결과에 따라 실행 여부를 정하는 등 여러 옵션이 있습니다. 이에 대하여는 Control Flow 문서를 참고하세요.

 

task는 기본적으로 다른 task에게 정보를 넘기지 않고 완전히 독립적으로 실행됩니다. task가 다른 task에게 정보를 넘기게 만드려면 XCom을 사용해야 합니다.

 

Task Instance

DAG가 실행될 때마다 DAG Run이라는 인스턴스가 만들어지는 것처럼, DAG 내에 있는 task는 task instacne로 인스턴스화됩니다.

 

task instacne는 주어진 DAG(및 주어진 data interval)를 위한 task의 특정 실행을 말합니다.

task instacne는 그것의 라이프사이클의 어느 부분에 와 있는지 나타내는 '상태'를 갖습니다.

 

다음은 상태의 종류입니다.

- none: 실행 대기열에 포함되지 않음(의존성이 만족되지 않음)

- scheduled: scheduler가 의존성이 만족되었다고 판단하였고, 곧 실행될 것임

- queued: executor에 연결되었고 worker를 기다림

- running: worker에서 실행 중(또는 로컬/동기식 executor에서 실행 중)

- success: 오류 없이 성공함

- shutdown: 실행 중에 외부에서 종료를 요청받음

- restarting: 실행 중에 외부에서 재시작을 요청받음

- failed: 오류가 발생하여 실패함

- skipped: branching, LatestOnly 등에 의해 스킵됨

- upstream_failed: upstream task가 실패하였는데 trigger rule이 이를 필요로 하는 경우

- up_for_retry: 실패하였지만 재시도 횟수가 남아 있어 reschedule을 기다리는 중

- up_for_reschedule: reschedule 모드인 센서에서 reschedule을 기다리는 중

- deferred: deferrable(연기 가능한) 오퍼레이터에서 트리거를 기다리는 중

- removed: 실행 중 DAG에서 task가 제거됨

task 라이프사이클

이상적인 상황은 none -> scheduled -> queued -> running -> success일 것입니다.

 

자작 task(오퍼레이터)가 실행될 때, 그것에 전달된 task(BaseOperator 같은) instance의 복사본을 갖게 되어, task 메타데이터나 XCom 메소드를 사용할 수 있습니다.

관계 용어에 대하여

task instance에는 두 종류의 관계가 있습니다.

 

우선 upstream/downstream입니다. DAG가 실행되면, DAG에 있는 모든 task에 대한 instance를 생성하는데, 이 task instance들은 모두 같은 data interval을 갖습니다.

 

반면 동일한 DAG지만 다른 DAG run에 있는 task와는 다른 data interval을 갖습니다. 이 task instance와의 관계는 previous/next라고 합니다. 이 차이를 구분할 필요가 있습니다. 예전 문서에는 upstream과 previous가 혼재하는 경우가 있어 주의가 필요합니다.

 

Timeout

task 실행에 제한 시간을 두고 싶다면, task 속성 execution_timeoutdatetime.timedelta 값을 넣을 수 있습니다. 이는 센서에도 가능합니다. execution_timeout을 초과하면 AirflowTaskTimeout이 발생합니다.

 

추가로, 센서에는 timeout이라는 파라미터가 있습니다. 이는 reschedule 모드에서만 유의미합니다. 이 timeout은 센서가 언제까지 활동할 것인지를 정하는 것입니다. timeout을 초과하면 AirflowSensorTimeout이 발생하고 센서는 재시도하지 않고 바로 실패합니다.

 

아래 SFTPSensor를 통해 이를 알아봅시다.

sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(seconds=60),
    timeout=3600,
    retries=2,
    mode="reschedule",
)

이 센서는 reschedule 모드로, 실행되었다가 일정 시간이 지나면 종료 후, 성공할 때까지 주기적으로 reschedule됩니다.

- execution_timeout이 60초: 센서가 poke를 할 때마다 60초 제한을 갖습니다.

- 60초가 지나면 AirflowTaskTimeout이 발생합니다. 센서의 retries가 2이므로 2회까지 재시도됩니다.

- timeout이 3600초: 센서의 첫 실행부터 성공할 때까지 3600초 제한을 갖습니다. 즉 3600초 동안 성공하지 못하면 AirflowSensorTimeout이 발생합니다. 이 경우 재시도하지 않습니다.

- 센서가 3600초 내에 다른 외부적 이유로 실패할 경우, retries=2이므로 2회까지 재시도됩니다. 재시도되더라도 timeout 시간을 초기화하지 않습니다.

간단히 말하면, timeoutreschedule 센서 자체의 시간 제한이고, execution_timeout은 센서의 poke에 걸린 시간 제한이며, timeoutretries는 무관합니다.

 

시간 제한을 두고 싶지만, 시간을 초과하여도 일단 계속 실행되게 만들고 싶다면 SLA를 사용할 수 있습니다.

 

SLA (Service Level Agreement)

DAG run 시작 시간에서부터 task가 언제까지 성공해야 바람직한지 정하는 걸 SLA라고 합니다. task 실행이 SLA보다 더 오래 걸릴 경우, UI의 "SLA Misses"에서 이를 확인할 수 있으며, 해당 task 정보를 email로 보낼 수 있습니다.

 

timeout과 다르게, SLA를 miss한 task는 취소되지 않습니다.

 

SLA를 비활성화하고 싶으면 구성 파일의 [core] check_slas = False하면 됩니다.

※ 수동 트리거한 task와 이벤트 기반 DAG는 SLA를 사용하지 않습니다.

 

SLA를 설정하려면 task의 sla 파라미터에 datetime.timedelta 값을 주면 됩니다. 또한 sla_miss_callback 파라미터에 SLA가 miss할 경우 실행할 함수(알림 등)를 넣을 수 있습니다.

sla_miss_callback

sla_miss_callback에 들어갈 함수는 아래 5개 파라미터를 받아야 합니다.

- dag: SLA를 miss한 task가 포함된 DAG run의 DAG 객체

- task_list: SLA를 miss한 모든 task의 리스트(string, \n으로 분리됨)

- blocking_task_list: sla_miss_callback이 실행되었을 때, (SLA를 miss한 task와 동일한 logical date를 가진) DAG run에서 아직 성공하지 않은 모든 task 리스트

- slas: task_list에 있는 task와 연결된 SlaMiss 객체 리스트

- blocking_tis: blocking_task_list에 있는 task와 연결된 task instance 객체 리스트

def my_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    ...

DAG 레벨로 sla_miss_callback을 정하고 task 레벨로 sla를 정하는 등으로 사용할 수 있습니다.

 

Exception

자작 task/오퍼레이터에서 task 상태를 제어하고 싶다면 다음 두 특별 예외를 발생시킬 수 있습니다.

- AirflowSkipException: 현재 task를 skipped로 표시

- AirflowFailException: 현재 task를 failed로 표시하고 남은 재시도 횟수 무시

이는 사용할 수 있는 데이터가 없거나, API 키가 유효하지 않는 등(재시도가 무의미한 상황) 코드에 스킵 또는 실패 조건을 빨리 판단할 수 있는 정보가 있을 때 유용합니다.

 

좀비/언데드 Task

task instance는 한 번 죽을 것으로 예상됩니다. 이에 반하는 상황이 있습니다.

- 좀비 task는 실행 중에 갑자기 죽은 task입니다. 프로세스가 죽거나, 기계가 멈춘 경우에 발생할 수 있습니다. Airflow는 이를 주기적으로 찾아 청소하고 설정에 따라 실패시키거나 재시도합니다.

- 언데드 task는 실행 계획에 없는데 실행된 task입니다. UI에서 수동으로 task instance를 조작할 때 주로 발생합니다. Airflow는 주기적으로 이를 찾아 멈춥니다.

 

Task별 Executor 구성

KubernetesExecutor 같은 특정 executor의 경우, task별로 executor을 구성할 수 있습니다.

이는 task/오퍼레이터의 executor_config 파라미터로 설정할 수 있습니다.

MyOperator(...,
    executor_config={
        "KubernetesExecutor":
            {"image": "myCustomDockerImage"}
    }
)

 

 

 

'Airflow' 카테고리의 다른 글

Airflow: TaskFlow  (0) 2023.08.31
Airflow: Operator  (0) 2023.08.26
Airflow: DAG Run  (0) 2023.08.21
Airflow: DAG  (0) 2023.08.21
Airflow: 구조 개요  (0) 2023.08.17