일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- AWS
- celery
- pipeline
- lambda
- Task
- kinesis
- dagrun
- github actions
- credentials
- sqs
- authoring
- Data Firehose
- Concept
- executor
- Git
- TaskFlow
- boto3
- RDS
- Operator
- ci/cd
- mysql
- SecretsManager
- airflow
- testing
- git book
- DAG
- S3
- XCOM
- Scheduling
- dynamic task mapping
- Today
- Total
CS
Airflow: Executor - (2) Celery Executor 본문
Celery Executor — Airflow Documentation
airflow.apache.org
원격 executor에는 CeleryExecutor, KubernetesExecutor, DaskExecutor, CeleryKubernetesExecutor, LocalKubernetesExecutor가 있는데, 이중 세팅이 간단한 CeleryExecutor만 알아보도록 하겠습니다.
- Celery Executor
- 구조
- Task 실행 프로세스
- 대기열
Celery Executor
CeleryExecutor는 worker를 scale out할 수 있는 방법 중 하나입니다. Celery 시스템을 사용하여 scheduler 등 Airflow의 프로세스와 여러 worker를, 대기열을 사용하여 연결하는 방식입니다.
이를 사용하려면 Celery 백엔드로 사용할 수 있는 RabbitMQ, Redis 등 브로커를 설치하고, airflow.cfg에서 executor를 CeleryExecutor로 변경해야 합니다.
Celery 브로커를 설치하는 과정 등 Celery에 대한 내용은 Celery 문서를 참고하세요.
Airflow의 Celery Executor 관련 구성 항목은 Airflow 구성 Celery 탭 문서에서 찾을 수 있습니다.
다음은 worker에서 준비해야 할 필수 사항입니다.
- Airflow가 설치되어 있어야 하며, Airflow CLI가 path에 있어야 함
- 전체 cluster에서 Airflow 구성이 동일해야 함(DAGS_FOLDER 경로 등 달라도 상관없는 건 제외)
- 오퍼레이터가 의존성을 충족해야 함. HiveOperator를 사용한다면 Hive CLI가 path에 있어야 하고, MySqlOperator를 쓴다면 관련 파이썬 라이브러리 경로가 PYTHONPATH에 있어야 함.
- 모든 노드가 동일한 DAG에 접근할 수 있어야 함. 각 노드에 동일한 DAG 파일을 준비해도 되고, 어떠한 곳에 DAG 파일을 두고 모든 노드가 접근할 수 있게 만들어도 됨. 주로 사용하는 방법은 Git 리포지토리에 DAG 파일을 두고 Chef, Puppet, Ansible 등을 통해 동기화하는 것임.
worker는 celery 명령어의 서브 명령어로 제어할 수 있습니다.
# worker 실행
airflow celery worker
# worker 중지: Celery process에 SIGTERM을 보냄
airflow celery stop
또한 Celery Flower라는 Celery의 web UI를 사용할 수 있습니다. Flower를 사용하면 worker 정보나 task 결과 등을 볼 수 있습니다.
# Flower 실행
airflow celery flower
Celery Executor를 사용할 때 몇몇 주의사항이 있습니다.
- result backend를 설정해야 함. Airflow 구성의 [celery]의 result_backend로 설정하며 기본 값은 Airflow와 연결된 데이터베이스임.
- Airflow 구성의 [celery_broker_transport_options]의 visibility_timeout이 가장 오래 걸리는 task의 예상 시간보다 짧지 않아야 함. 해당 옵션은 Celery 브로커로 Redis나 SQS를 사용할 때만 활성화되는 옵션으로, 해당 시간까지 worker가 반응하지 않으면 다시 다른 worker로 명령을 내림.
- Celery 브로커로 Redis Sentinel을 사용하며 Redis Server에 비밀번호를 설정한 경우 Airflow 구성의 [celery_broker_transport_options]에 Redis Server 비밀번호를 명시해야 함.
- Airflow 구성의 [celery]의 worker_concurrency를 적절히 정해야 함. 해당 값은 한 worker에서 받을 수 있는 task의 수임.
- 대기열 이름은 256자까지만 가능하나, 브로커마다 다른 제한이 있을 수 있음.
게시글에서 WSL을 사용하여 클러스터를 구성해 간단한 DAG를 실행하는 과정을 볼 수 있습니다.
구조
클러스터는 다음 요소로 구성됩니다.
- Scheduler: 실행해야 할 task를 대기열에 넣음
- Web server: DAG/task의 상태 정보를 제공하는 HTTP Server
- 데이터베이스: DAG/task, 변수, 접속 상태 정보를 저장함
- Celery: 대기열 메커니즘
- Worker: 할당된 task 실행
Celery는 두 가지 요소로 구성됩니다.
- 브로커: 실행 명령어 보관
- Result backend: 마무리한 명령어의 상태 보관
이 요소들은 아래처럼 연결됩니다(숫자는 실행 순서가 아닙니다.).
- 1. Web server > Worker: task 실행 로그를 가져옴
- 2. Web server > DAG 파일: DAG 구조를 가져옴
- 3. Web server > 데이터베이스: task 상태를 가져옴
- 4. Worker > DAG 파일: DAG 구조를 가져오고 task를 실행함
- 5. Worker > 데이터베이스: 접속, 변수, XCom에 대한 정보를 가져오고, 또 저장함
- 6. Worker > Celery result backend: task 상태를 저장함
- 7. Worker > Celery 브로커: 실행 명령어를 저장함
- 8. Scheduler > DAG 파일: DAG 구조를 가져오고 task를 실행함
- 9. Scheduler > 데이터베이스: DAG Run과 관련된 task를 저장함
- 10. Scheduler > Celery result backend: 마무리한 task의 상태를 가져옴
- 11. Scheduler > Celery 브로커: 실행할 명령어를 넣음
Task 실행 프로세스
우선 두 프로세스가 실행 중입니다.
- SchedulerProcess: task를 프로세스하고 CeleryExecutor를 통해 실행함
- WorkerProcess: 대기열에 새로운 task가 들어오는지 관찰함
-- WorkerChildProcess: 새로운 task를 기다림
두 데이터베이스가 있습니다.
- QueueBroker
- ResultBackend
프로세스 중에 두 프로세스가 생성됩니다.
- LocalTaskJobProcess: LocalTaskJob에 따른 프로세스로, RawTaskProcess를 모니터링함. TaskRunner를 사용하여 새로운 프로세스를 시작함
- RawTaskProcess: execute() 등의 코드를 프로세스함
이 프로세스는 다음 순서로 수행됩니다.
[1] SchedulerProcess가 task를 프로세스하고, 수행되어야 하는 task를 QueueBroker로 보냄
[2] SchedulerProcess가 task 상태를 확인하기 위해 ResultBackend를 주기적으로 쿼리하기 시작함
[3] QueueBroker가 WorkerProcess에 task 정보를 보냄
[4] WorkerProcess가 하나의 task를 하나의 WorkerChildProcess에 할당함
[5] WorkerChildProcess가 execute_command()를 통해 task를 실행함. 이는 LocalTaskJobProcess를 생성함.
[6] LocalTaskJobProcess는 LocalTaskJob 클래스를 따름. 이는 TaskRunner를 통해 새로운 프로세스(RawTaskProcess)를 시작함.
[7][8] RawTaskProcess가 코드 실행을 완료하면 중지되고, 프로세스가 끝나면 LocalTaskJobProcess도 중지됨
[9][10] WorkerChildProcess가 결과를 받고 WorkerProcess에 task 종료와 새로운 task를 받을 수 있음을 알림
[11] WorkerProcess가 ResultBackend에 정보를 저장함
[12] SchedulerProcess가 ResultBackend를 쿼리하여 task 상태를 확인함
대기열
CeleryExecutor를 사용할 때, task를 보낼 Celery 대기열이 명시됩니다. queue는 BaseOperator의 파라미터이므로, 각 task는 각각 다른 대기열에 할당될 수 있습니다. 기본 대기열은 airflow.cfg의 [operator]의 default_queue에서 지정할 수 있습니다. task에 따로 대기열을 설정하지 않으면 기본 대기열로 전달되고, worker 또한 listen할 대기열을 정하지 않으면 기본 대기열을 listen합니다.
worker는 하나 이상의 대기열을 listen할 수 있습니다. worker를 시작할 때
airflow celery worker -q spark,quark
처럼 대기열 이름을 공백 없이 쉼표로 구분하여 여러 대기열을 지정할 수 있습니다. 이렇게 하면 worker는 해당 대기열에 있는 task만 가져옵니다.
이는 worker를 특성화해야 할 때 좋습니다. 예를 들어, 하나의 worker가 수천 개의 task를 수행할 정도로 매우 가벼운 task를 다루거나, Spark 클러스터 같은 특정 환경과 보안 인증이 필요한 작업만 수행하는 worker를 준비할 수 있습니다.
'Airflow' 카테고리의 다른 글
Airflow: Param (파라미터) (0) | 2023.09.19 |
---|---|
Airflow: XCom & 변수 (0) | 2023.09.14 |
Airflow: Executor - (1) 개요 및 로컬 Executor (0) | 2023.09.12 |
Airflow: Celery Executor를 사용하여 클러스터로 DAG 실행하기 (0) | 2023.09.05 |
Airflow: TaskFlow (0) | 2023.08.31 |