Airflow: DAG Run
DAG Run은 DAG의 인스턴스 객체를 말합니다. DAG가 실행될 때 DAG run이 생성되고 그 안에 있는 task들이 실행됩니다. DAG run의 상태는 그 안에 있는 task의 상태를 따릅니다. 하나의 DAG에 기반한 여러 DAG run이 동시에 독립적으로 실행될 수 있습니다.
- DAG Run 상태
- Cron 프리셋
- Data Interval
- DAG 재실행
- Catchup
- Backfill
- Task 재실행
- 외부 트리거
- 기타
DAG Run 상태
DAG run의 상태는 DAG 실행이 끝났을 때 결정됩니다. DAG 내 모든 task의 상태가 더 이상 변하지 않게 되었을 때(success, failed, skipped 등) DAG run에 상태가 부여됩니다. 이는 DAG의 잎 노드의 상태에 기반합니다.
DAG run의 가능한 최종 상태는 다음 두 가지입니다.
- success: 모든 잎 노드(task instance)가 (success 또는 skipped) 상태일 시
- failed: 잎 노드 하나라도 (failed 또는 upstream_failed) 상태인 경우
즉 DAG run의 상태는 잎 노드의 상태에 좌지우지됩니다.
※ 특수한 trigger rule을 사용하는 경우 의도치 않은 결과가 발생할 수 있습니다. 잎 노드의 trigger rule을 all_done으로 할 경우, 다른 task가 어떤 상태이건 해당 task는 실행되며, 해당 task가 성공하기만 하면 중간에 failed가 있더라도 DAG run 전체가 success 상태가 됩니다. 이를 해결하기 위해서 이른바 watcher task라고 하는, trigger rule을 one_failed로 하며 실행될 경우 항상 실패하는 task를 다른 모든 task의 downstream으로 두는 방식을 사용할 수 있습니다. 한 task라도 실패할 시 watcher task가 트리거되며 실패하는데, watcher task가 잎 노드가 되기 때문에 DAG run의 상태가 failed가 됩니다.
현재 실행 중인 DAG run은 UI 대시보드의 "Running" 탭에서 볼 수 있습니다.
최신 DAG run이 failed인 DAG는 "Failed" 탭에서 볼 수 있습니다.
Cron 프리셋
DAG를 생성할 때 schedule 파라미터에 cron 표현식, datetime.timedelta 객체, cron 프리셋을 넣을 수 있습니다. 좀 더 복잡한 schedule을 원하면 Timetable을 만들 수 있습니다.
cron 프리셋 종류는 Airflow 문서를 참고하세요.
DAG는 schedule에 따라 데이터베이스 백엔드에 DAG run 목록이 기록되고 이에 따라 인스턴스가 만들어 집니다.
Data Interval
각 DAG run은 처리할 시간적 범위인 data interval을 갖습니다. schedule이 @daily인 DAG라면 data interval은 매일 00:00 ~ 24:00가 됩니다.
DAG run은 보통 그것의 data interval이 끝난 후 계획됩니다. 그렇게 해야 data interval에 포함된 데이터를 모두 다룰 수 있기 때문입니다. 예를 들어 2023-08-15를 대상으로 한 @daily DAG는 2023-08-16 00:00:00 이후에 실행됩니다.
Airflow에 등장하는 모든 date는 data interval 개념에 엮여 있습니다. 예시로 DAG run의 logical date는 data interval의 시작점을 나타내지, DAG가 실제로 실행된 시간을 나타내는 게 아닙니다.
DAG의 start_date도 logical date와 유사한데, 이는 DAG의 첫 번째 data interval의 시작점을 가리키지, DAG의 첫 번째 실행을 가리키는 게 아닙니다. 여기서 DAG run은 start_date에서 data interval이 지난 후부터 계획될 수 있다는 것을 알 수 있습니다.
DAG 재실행
이미 실행된 DAG를 다시 실행할 수 있습니다.
Catchup
DAG에는 start_date를 정해야 하고 end_date도 정할 수 있습니다. 이 기간과 schedule에 따라 각각의 DAG run이 계획되고 실행됩니다. scheduler는 기본적으로 가장 마지막에 실행된 data interval 이후 실행되지 않은 모든 data interval에 대해 DAG run을 실행합니다. 이 개념을 catchup이라고 합니다.
만약 DAG가 catchup을 할 수 없는 내용이라면 catchup을 끄는 게 좋을 겁니다. 이는 DAG 파라미터의 catchup에 False를 주거나, 구성 파일의 catchup_by_default에 False를 기재해 설정할 수 있습니다(기본 값 True). catchup을 끄면 scheduler는 최신 data interval에 대해서만 DAG run을 계획합니다.
예를 들어, start_date가 2020-01-01이고 schedule이 @daily인 DAG를 2023-08-16에 실행한다고 하면,
- catchup=False: 2023-08-15 00:00 ~ 2023-08-16 00:00를 대상으로 총 1번 실행
- catchup=True: 2020-01-01 00:00 ~ 2020-01-02 00:00, ... , 2023-08-15 00:00 ~ 2023-08-16 00:00를 대상으로 총 약 1350번을 순차적으로 실행
catchup은 사용 중이던 DAG를 정지했다가 다시 실행하였을 때도 그 기간에 대하여 수행됩니다.
catchup은 기간으로 구별 가능한 데이터를 다룰 때 유용합니다.
DAG가 자체적으로 catchup 같은 기능을 갖고 있다면 catchup을 끄는 게 좋을 것입니다.
※ cron 표현식과 timedelta 비교
start_date와 catchup에 따라 두 방법은 다르게 작동할 수 있습니다.
2월 1일 01:05에 DAG를 활성화하였을 때, 두 schedule 모두 30분을 주기로 하지만 다르게 계획됩니다.
start date | catchup | */30 * * * * | timedelta(minutes=30) | 설명 |
2월 1일 | True | - 00:00 ~ 00:30 - 00:30 ~ 01:00 |
- 00:00 ~ 00:30 - 00:30 ~ 01:00 |
동일한 결과 |
2월 1일 | False | 00:30 ~ 01:00 | 00:35 ~ 01:05 | cron은 정각을 기준으로 하지만 timedelta는 현재 시간을 기준으로 함 |
2월 1일 00:10 | True | 00:30 ~ 01:00 | 00:10 ~ 00:40 | cron은 00:00 ~ 00:30은 start_date 이전이므로 실행하지 않음 timedelta는 catchup을 실행하고 01:10에 00:40 ~ 01:10 작업을 실행함 |
2월 1일 00:10 | False | 00:30 ~ 01:00 | 00:35 ~ 01:05 | cron은 start_date 상관 없이 정각 기준 timedelta는 현재 시간 기준. 01:35에 01:05 ~ 01:35 작업 실행 |
cron은 반드시 정각을 기준(30분 시 0분과 30분만)으로 하지만 timedelta는 정각과 상관 없이 작동하며 catchup을 사용하지 않을 시 가능한 경우 현재 시간을 data interval의 끝점으로 삼는 걸 알 수 있습니다.
Backfill
DAG를 과거의 특정 기간을 대상으로 실행할 수 있습니다. start_date보다 이전의 기간에 대해서도 가능합니다. 이를 Backfill이라고 합니다.
backfill은 catchup이 꺼져 있어도 가능합니다. backfill은 CLI로 수행할 수 있습니다.
airflow dags backfill \
--start-date START_DATE \
--end-date END_DATE \
dag_id
Task 재실행
DAG run에서 어떤 task는 실패할 수 있습니다. 문제를 고친 후 해당 task를 다시 실행하고 싶으면, 해당 task를 clear하면 됩니다. task instance를 clear하는 것은 task instance 기록을 제거하는 게 아니라, max_tries를 0으로 바꾸고 상태를 None으로 변경하여 task가 재실행되도록 합니다.
UI의 그래프에서 실패한 task를 클릭하고 "Clear" 버튼을 눌러 clear할 수 있습니다.
여기서 선택할 수 있는 옵션이 있습니다.
- Past: 해당 task의 과거 instance도 함께 clear
- Future: 해당 task의 미래 instance도 함께 clear
- Upstream: 해당 DAG에서 해당 task의 upstream task도 함께 clear
- Downstream: 해당 DAG에서 해당 task의 downstream task도 함께 clear
- Recursive: 해당 task의 자식과 부모 DAG에 있는 task도 함께 clear
- Failed: 해당 DAG의 실패한 task만 clear
외부 트리거
DAG run은 수동적으로 생성할 수도 있습니다. CLI에서도 가능하고, UI에서 Trigger 버튼을 눌러서도 가능합니다.
DAG 트리거 시 파라미터 전달
DAG를 트리거할 때 JSON blob으로 파라미터를 전달할 수 있습니다.
이는 jinja template의 dag_run.conf로 접근할 수 있습니다.
기타
- UI에서 task instance를 failed 처리할 수 있습니다. 이로 task instance를 정지시킬 수 있습니다.
- UI에서 task instance를 success 처리할 수 있습니다. 이는 Airflow가 아니라 다른 데 문제가 있는 경우에 사용할 수 있습니다.