일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- AWS
- airflow
- SecretsManager
- testing
- executor
- boto3
- ci/cd
- lambda
- kinesis
- Scheduling
- TaskFlow
- pipeline
- github actions
- DAG
- Concept
- mysql
- credentials
- Operator
- Git
- dynamic task mapping
- S3
- git book
- XCOM
- sqs
- RDS
- celery
- Task
- dagrun
- Data Firehose
- authoring
- Today
- Total
CS
Airflow: Executor - (1) 개요 및 로컬 Executor 본문
Executor — Airflow Documentation
airflow.apache.org
- Executor 개요
- Executor 종류
- 로컬 Executor
- dag.test()로 DAG 테스트
- LocalExecutor
- SequentialExecutor
Executor 개요
Executor는 task instance가 무엇으로 실행될지에 관한 메커니즘입니다. executor는 일반적인 API를 가지며 "pluggable"하므로 필요에 따라 executor를 변경할 수 있습니다.
Airflow는 한 번에 한 executor만 다룰 수 있습니다. 이는 구성 파일의 [core]에서 executor 옵션으로 설정할 수 있습니다.
내장 executor는 이름을 기재하면 됩니다.
[core]
executor = KubernetesExecutor
사용자 정의 executor를 사용할 때는 전체 경로를 입력해야 합니다.
[core]
executor = my_company.executors.MyCustomExecutor
CLI에서 현재 설정된 executor를 확인하고 싶으면 다음 명령어를 입력합니다.
airflow config get-value core executor
Executor 종류
Executor에는 로컬에서(scheduler 프로세스에서) 돌아가는 것과 원격으로(보통 worker pool에서) 돌아가는 것이 있습니다.
Airflow는 기본적으로 SequentialExecutor로 설정되어 있는데, 이는 로컬 executor이고 가장 안전한 옵션이지만, 한 번에 하나의 작업밖에 수행하지 못합니다. 따라서 소규모의 단일 기기에서도, 여러 기기 또는 클라우드에서 원격 executor로도 사용할 수 있는 LocalExecutor로 변경하는 것을 추천합니다.
※ executor는 개별적인 프로세스를 갖는 게 아니라 scheduler 프로세스 내부에서 실행됩니다.
로컬 Executor
dag.test()로 DAG 테스트
IDE에서 DAG를 디버그하려면, DAG 파일에 dag.test 명령어를 넣고 DAG 파일을 실행하면 됩니다.
이 방법은 SQLite를 포함한 어떤 데이터베이스에서도 가능하고, 단일 프로세스로 돌아가므로 fail fast(필요한 task 하나가 실패하면 바로 DAG 실패 처리)됩니다.
dag.test를 설정하려면, DAG 파일 마지막에 아래 두 줄을 넣으면 됩니다.
if __name__ == "__main__":
# <DAG ID>.test()
dag.test()
dag.test에 execution_date나 run_conf 같은 인수를 넣어 필요한 시도를 해볼 수 있습니다.
dag.test는 executor를 필요로 하지 않고, scheduler를 통하지 않으며 backfill을 시도하지 않으므로 이전에 사용하던 DebugExecutor보다 좋습니다.
LocalExecutor
LocalExecutor는 다양한 모드에 따른 조절된 방식으로 프로세스를 생성하여 task를 실행합니다.
LocalExecutor의 기반인 BaseExecutor는 인수 parallelism을 받는데, 이로 생성할 프로세스 수를 통제할 수 있습니다. parallelism을 0으로 하면 프로세스 수에 제한을 두지 않습니다. BaseExecutor는 구성 파일의 [core]의 parallelism을 가져오므로 해당 값을 수정하면 됩니다.
이에 따라 LocalExecutor는 다음 두 가지 방식으로 구현될 수 있습니다.
무제한 Parallelism (parallelism == 0)
execute_async가 호출될 때마다 프로세스를 생성하고 task 하나를 할당하여 실행하도록 합니다. 실행을 완료하면 결과를 result_queue에 전달하고 프로세스를 제거합니다. task당 새로운 프로세스가 할당되므로 task_queue가 필요 없습니다. 여기서 사용되는 프로세스는 LocalWorker 클래스에 속합니다.
제한된 Parallelism (parallelism > 0)
start를 실행할 때 parallelism과 동일한 수의 프로세스를 생성합니다. task_queue를 사용하여 준비된 worker에 task를 분배합니다. worker 프로세스는 계속 실행되다가 LocalExecutor가 executor를 멈추라는 명령을 받으면 poison token(poison pill)을 worker에 보내 제거합니다. 여기서 사용되는 프로세스는 QueuedLocalWorker 클래스에 속합니다.
SequentialExecutor는 parallelism이 1인 LocalExecutor이라 할 수 있습니다. 이 옵션을 잘 사용하면 로컬에서 돌아가는 executor는 모두 LocalExecutor를 기반으로 구현할 수 있을 것입니다.
SequentialExecutor
SequentialExecutor는 Airflow를 설치하면 기본적으로 설정된 executor입니다. 이는 SQLite와 함께 사용할 수 있는 유일한 executor입니다. SQLite는 하나의 연결만 가능하기 때문입니다. 이 exeuctor는 한 번에 하나의 task만 실행할 수 있습니다. 따라서 프로덕션 환경에서는 사용하지 않는 게 좋습니다. 주로 간단한 시험이나 dag.test() 등의 디버그를 수행할 때 사용합니다.
'Airflow' 카테고리의 다른 글
Airflow: XCom & 변수 (0) | 2023.09.14 |
---|---|
Airflow: Executor - (2) Celery Executor (0) | 2023.09.12 |
Airflow: Celery Executor를 사용하여 클러스터로 DAG 실행하기 (0) | 2023.09.05 |
Airflow: TaskFlow (0) | 2023.08.31 |
Airflow: Operator (0) | 2023.08.26 |