Airflow

Airflow: DAG Run

vcxz 2023. 8. 21. 23:31

Airflow DAG Runs

DAG Run은 DAG의 인스턴스 객체를 말합니다. DAG가 실행될 때 DAG run이 생성되고 그 안에 있는 task들이 실행됩니다. DAG run의 상태는 그 안에 있는 task의 상태를 따릅니다. 하나의 DAG에 기반한 여러 DAG run이 동시에 독립적으로 실행될 수 있습니다.

 

  • DAG Run 상태
  • Cron 프리셋
    • Data Interval
  • DAG 재실행
    • Catchup
    • Backfill
    • Task 재실행
  • 외부 트리거
  • 기타

 

 


DAG Run 상태

DAG run의 상태는 DAG 실행이 끝났을 때 결정됩니다. DAG 내 모든 task의 상태가 더 이상 변하지 않게 되었을 때(success, failed, skipped 등) DAG run에 상태가 부여됩니다. 이는 DAG의 잎 노드의 상태에 기반합니다.

 

DAG run의 가능한 최종 상태는 다음 두 가지입니다.

- success: 모든 잎 노드(task instance)가 (success 또는 skipped) 상태일 시

- failed: 잎 노드 하나라도 (failed 또는 upstream_failed) 상태인 경우

즉 DAG run의 상태는 잎 노드의 상태에 좌지우지됩니다.

※ 특수한 trigger rule을 사용하는 경우 의도치 않은 결과가 발생할 수 있습니다. 잎 노드의 trigger rule을 all_done으로 할 경우, 다른 task가 어떤 상태이건 해당 task는 실행되며, 해당 task가 성공하기만 하면 중간에 failed가 있더라도 DAG run 전체가 success 상태가 됩니다. 이를 해결하기 위해서 이른바 watcher task라고 하는, trigger rule을 one_failed로 하며 실행될 경우 항상 실패하는 task를 다른 모든 task의 downstream으로 두는 방식을 사용할 수 있습니다. 한 task라도 실패할 시 watcher task가 트리거되며 실패하는데, watcher task가 잎 노드가 되기 때문에 DAG run의 상태가 failed가 됩니다.

 

현재 실행 중인 DAG run은 UI 대시보드의 "Running" 탭에서 볼 수 있습니다.

최신 DAG run이 failed인 DAG는 "Failed" 탭에서 볼 수 있습니다.

 

Cron 프리셋

DAG를 생성할 때 schedule 파라미터에 cron 표현식, datetime.timedelta 객체, cron 프리셋을 넣을 수 있습니다. 좀 더 복잡한 schedule을 원하면 Timetable을 만들 수 있습니다.

cron 프리셋 종류는 Airflow 문서를 참고하세요.

DAG는 schedule에 따라 데이터베이스 백엔드에 DAG run 목록이 기록되고 이에 따라 인스턴스가 만들어 집니다.

Data Interval

각 DAG run은 처리할 시간적 범위인 data interval을 갖습니다. schedule이 @daily인 DAG라면 data interval은 매일 00:00 ~ 24:00가 됩니다.

DAG run은 보통 그것의 data interval이 끝난 후 계획됩니다. 그렇게 해야 data interval에 포함된 데이터를 모두 다룰 수 있기 때문입니다. 예를 들어 2023-08-15를 대상으로 한 @daily DAG는 2023-08-16 00:00:00 이후에 실행됩니다.

 

Airflow에 등장하는 모든 date는 data interval 개념에 엮여 있습니다. 예시로 DAG run의 logical date는 data interval의 시작점을 나타내지, DAG가 실제로 실행된 시간을 나타내는 게 아닙니다.

DAG의 start_date도 logical date와 유사한데, 이는 DAG의 첫 번째 data interval의 시작점을 가리키지, DAG의 첫 번째 실행을 가리키는 게 아닙니다. 여기서 DAG run은 start_date에서 data interval이 지난 후부터 계획될 수 있다는 것을 알 수 있습니다.

 

DAG 재실행

이미 실행된 DAG를 다시 실행할 수 있습니다.

Catchup

DAG에는 start_date를 정해야 하고 end_date도 정할 수 있습니다. 이 기간과 schedule에 따라 각각의 DAG run이 계획되고 실행됩니다. scheduler는 기본적으로 가장 마지막에 실행된 data interval 이후 실행되지 않은 모든 data interval에 대해 DAG run을 실행합니다. 이 개념을 catchup이라고 합니다.

 

만약 DAG가 catchup을 할 수 없는 내용이라면 catchup을 끄는 게 좋을 겁니다. 이는 DAG 파라미터의 catchup에 False를 주거나, 구성 파일의 catchup_by_default에 False를 기재해 설정할 수 있습니다(기본 값 True). catchup을 끄면 scheduler는 최신 data interval에 대해서만 DAG run을 계획합니다.

 

예를 들어, start_date가 2020-01-01이고 schedule이 @daily인 DAG를 2023-08-16에 실행한다고 하면,

- catchup=False: 2023-08-15 00:00 ~ 2023-08-16 00:00를 대상으로 총 1번 실행

- catchup=True: 2020-01-01 00:00 ~ 2020-01-02 00:00, ... , 2023-08-15 00:00 ~ 2023-08-16 00:00를 대상으로 총 약 1350번을 순차적으로 실행

 

catchup은 사용 중이던 DAG를 정지했다가 다시 실행하였을 때도 그 기간에 대하여 수행됩니다.

catchup은 기간으로 구별 가능한 데이터를 다룰 때 유용합니다.

DAG가 자체적으로 catchup 같은 기능을 갖고 있다면 catchup을 끄는 게 좋을 것입니다.

 

※ cron 표현식과 timedelta 비교

start_date와 catchup에 따라 두 방법은 다르게 작동할 수 있습니다.

2월 1일 01:05에 DAG를 활성화하였을 때, 두 schedule 모두 30분을 주기로 하지만 다르게 계획됩니다.

start date catchup */30 * * * * timedelta(minutes=30) 설명
2월 1일 True - 00:00 ~ 00:30
- 00:30 ~ 01:00
- 00:00 ~ 00:30
- 00:30 ~ 01:00
동일한 결과
2월 1일 False 00:30 ~ 01:00 00:35 ~ 01:05 cron은 정각을 기준으로 하지만
timedelta는 현재 시간을 기준으로 함
2월 1일 00:10 True 00:30 ~ 01:00 00:10 ~ 00:40 cron은 00:00 ~ 00:30은 start_date 이전이므로 실행하지 않음
timedelta는 catchup을 실행하고 01:10에 00:40 ~ 01:10 작업을 실행함
2월 1일 00:10 False 00:30 ~ 01:00 00:35 ~ 01:05 cron은 start_date 상관 없이 정각 기준
timedelta는 현재 시간 기준. 01:35에 01:05 ~ 01:35 작업 실행

cron은 반드시 정각을 기준(30분 시 0분과 30분만)으로 하지만 timedelta는 정각과 상관 없이 작동하며 catchup을 사용하지 않을 시 가능한 경우 현재 시간을 data interval의 끝점으로 삼는 걸 알 수 있습니다.

Backfill

DAG를 과거의 특정 기간을 대상으로 실행할 수 있습니다. start_date보다 이전의 기간에 대해서도 가능합니다. 이를 Backfill이라고 합니다.

 

backfill은 catchup이 꺼져 있어도 가능합니다. backfill은 CLI로 수행할 수 있습니다.

airflow dags backfill \
    --start-date START_DATE \
    --end-date END_DATE \
    dag_id

Task 재실행

DAG run에서 어떤 task는 실패할 수 있습니다. 문제를 고친 후 해당 task를 다시 실행하고 싶으면, 해당 task를 clear하면 됩니다. task instance를 clear하는 것은 task instance 기록을 제거하는 게 아니라, max_tries를 0으로 바꾸고 상태를 None으로 변경하여 task가 재실행되도록 합니다.

 

UI의 그래프에서 실패한 task를 클릭하고 "Clear" 버튼을 눌러 clear할 수 있습니다.

여기서 선택할 수 있는 옵션이 있습니다.

- Past: 해당 task의 과거 instance도 함께 clear

- Future: 해당 task의 미래 instance도 함께 clear

- Upstream: 해당 DAG에서 해당 task의 upstream task도 함께 clear

- Downstream: 해당 DAG에서 해당 task의 downstream task도 함께 clear

- Recursive: 해당 task의 자식과 부모 DAG에 있는 task도 함께 clear

- Failed: 해당 DAG의 실패한 task만 clear

 

외부 트리거

DAG run은 수동적으로 생성할 수도 있습니다. CLI에서도 가능하고, UI에서 Trigger 버튼을 눌러서도 가능합니다.

DAG 트리거 시 파라미터 전달

DAG를 트리거할 때 JSON blob으로 파라미터를 전달할 수 있습니다.

이는 jinja template의 dag_run.conf로 접근할 수 있습니다.

 

기타

- UI에서 task instance를 failed 처리할 수 있습니다. 이로 task instance를 정지시킬 수 있습니다.

- UI에서 task instance를 success 처리할 수 있습니다. 이는 Airflow가 아니라 다른 데 문제가 있는 경우에 사용할 수 있습니다.