관리 메뉴

CS

Airflow: Executor - (2) Celery Executor 본문

Airflow

Airflow: Executor - (2) Celery Executor

vcxz 2023. 9. 12. 15:25
 

Celery Executor — Airflow Documentation

 

airflow.apache.org

원격 executor에는 CeleryExecutor, KubernetesExecutor, DaskExecutor, CeleryKubernetesExecutor, LocalKubernetesExecutor가 있는데, 이중 세팅이 간단한 CeleryExecutor만 알아보도록 하겠습니다.

 

  • Celery Executor
  • 구조
  • Task 실행 프로세스
  • 대기열

 

 


Celery Executor

CeleryExecutor는 worker를 scale out할 수 있는 방법 중 하나입니다. Celery 시스템을 사용하여 scheduler 등 Airflow의 프로세스와 여러 worker를, 대기열을 사용하여 연결하는 방식입니다.

이를 사용하려면 Celery 백엔드로 사용할 수 있는 RabbitMQ, Redis 등 브로커를 설치하고, airflow.cfg에서 executor를 CeleryExecutor로 변경해야 합니다.

 

Celery 브로커를 설치하는 과정 등 Celery에 대한 내용은 Celery 문서를 참고하세요.

Airflow의 Celery Executor 관련 구성 항목은 Airflow 구성 Celery 탭 문서에서 찾을 수 있습니다.

 

다음은 worker에서 준비해야 할 필수 사항입니다.

- Airflow가 설치되어 있어야 하며, Airflow CLI가 path에 있어야 함

- 전체 cluster에서 Airflow 구성이 동일해야 함(DAGS_FOLDER 경로 등 달라도 상관없는 건 제외)

- 오퍼레이터가 의존성을 충족해야 함. HiveOperator를 사용한다면 Hive CLI가 path에 있어야 하고, MySqlOperator를 쓴다면 관련 파이썬 라이브러리 경로가 PYTHONPATH에 있어야 함.

- 모든 노드가 동일한 DAG에 접근할 수 있어야 함. 각 노드에 동일한 DAG 파일을 준비해도 되고, 어떠한 곳에 DAG 파일을 두고 모든 노드가 접근할 수 있게 만들어도 됨. 주로 사용하는 방법은 Git 리포지토리에 DAG 파일을 두고 Chef, Puppet, Ansible 등을 통해 동기화하는 것임.

 

worker는 celery 명령어의 서브 명령어로 제어할 수 있습니다.

# worker 실행
airflow celery worker

# worker 중지: Celery process에 SIGTERM을 보냄
airflow celery stop

또한 Celery Flower라는 Celery의 web UI를 사용할 수 있습니다. Flower를 사용하면 worker 정보나 task 결과 등을 볼 수 있습니다.

# Flower 실행
airflow celery flower

Celery Executor를 사용할 때 몇몇 주의사항이 있습니다.

- result backend를 설정해야 함. Airflow 구성의 [celery]result_backend로 설정하며 기본 값은 Airflow와 연결된 데이터베이스임.

- Airflow 구성의 [celery_broker_transport_options]visibility_timeout이 가장 오래 걸리는 task의 예상 시간보다 짧지 않아야 함. 해당 옵션은 Celery 브로커로 Redis나 SQS를 사용할 때만 활성화되는 옵션으로, 해당 시간까지 worker가 반응하지 않으면 다시 다른 worker로 명령을 내림.

- Celery 브로커로 Redis Sentinel을 사용하며 Redis Server에 비밀번호를 설정한 경우 Airflow 구성의 [celery_broker_transport_options]에 Redis Server 비밀번호를 명시해야 함.

- Airflow 구성의 [celery]worker_concurrency를 적절히 정해야 함. 해당 값은 한 worker에서 받을 수 있는 task의 수임.

- 대기열 이름은 256자까지만 가능하나, 브로커마다 다른 제한이 있을 수 있음.

 

게시글에서 WSL을 사용하여 클러스터를 구성해 간단한 DAG를 실행하는 과정을 볼 수 있습니다.

 

 


구조

클러스터는 다음 요소로 구성됩니다.

- Scheduler: 실행해야 할 task를 대기열에 넣음

- Web server: DAG/task의 상태 정보를 제공하는 HTTP Server

- 데이터베이스: DAG/task, 변수, 접속 상태 정보를 저장함

- Celery: 대기열 메커니즘

- Worker: 할당된 task 실행

 

Celery는 두 가지 요소로 구성됩니다.

- 브로커: 실행 명령어 보관

- Result backend: 마무리한 명령어의 상태 보관

 

이 요소들은 아래처럼 연결됩니다(숫자는 실행 순서가 아닙니다.).

Celery Executor 클러스터

- 1. Web server > Worker: task 실행 로그를 가져옴

- 2. Web server > DAG 파일: DAG 구조를 가져옴

- 3. Web server > 데이터베이스: task 상태를 가져옴

 

- 4. Worker > DAG 파일: DAG 구조를 가져오고 task를 실행함

- 5. Worker > 데이터베이스: 접속, 변수, XCom에 대한 정보를 가져오고, 또 저장함

- 6. Worker > Celery result backend: task 상태를 저장함

- 7. Worker > Celery 브로커: 실행 명령어를 저장함

 

- 8. Scheduler > DAG 파일: DAG 구조를 가져오고 task를 실행함

- 9. Scheduler > 데이터베이스: DAG Run과 관련된 task를 저장함

- 10. Scheduler > Celery result backend: 마무리한 task의 상태를 가져옴

- 11. Scheduler > Celery 브로커: 실행할 명령어를 넣음

 

 


Task 실행 프로세스

우선 두 프로세스가 실행 중입니다.

- SchedulerProcess: task를 프로세스하고 CeleryExecutor를 통해 실행함

- WorkerProcess: 대기열에 새로운 task가 들어오는지 관찰함

-- WorkerChildProcess: 새로운 task를 기다림

 

두 데이터베이스가 있습니다.

- QueueBroker

- ResultBackend

 

프로세스 중에 두 프로세스가 생성됩니다.

- LocalTaskJobProcess: LocalTaskJob에 따른 프로세스로, RawTaskProcess를 모니터링함. TaskRunner를 사용하여 새로운 프로세스를 시작함

- RawTaskProcess: execute() 등의 코드를 프로세스함

 

이 프로세스는 다음 순서로 수행됩니다.

Task 실행 프로세스

[1] SchedulerProcess가 task를 프로세스하고, 수행되어야 하는 task를 QueueBroker로 보냄

[2] SchedulerProcess가 task 상태를 확인하기 위해 ResultBackend를 주기적으로 쿼리하기 시작함

[3] QueueBrokerWorkerProcess에 task 정보를 보냄

[4] WorkerProcess가 하나의 task를 하나의 WorkerChildProcess에 할당함

[5] WorkerChildProcessexecute_command()를 통해 task를 실행함. 이는 LocalTaskJobProcess를 생성함.

[6] LocalTaskJobProcessLocalTaskJob 클래스를 따름. 이는 TaskRunner를 통해 새로운 프로세스(RawTaskProcess)를 시작함.

[7][8] RawTaskProcess가 코드 실행을 완료하면 중지되고, 프로세스가 끝나면 LocalTaskJobProcess도 중지됨

[9][10] WorkerChildProcess가 결과를 받고 WorkerProcess에 task 종료와 새로운 task를 받을 수 있음을 알림

[11] WorkerProcessResultBackend에 정보를 저장함

[12] SchedulerProcessResultBackend를 쿼리하여 task 상태를 확인함

 

 


대기열

CeleryExecutor를 사용할 때, task를 보낼 Celery 대기열이 명시됩니다. queueBaseOperator의 파라미터이므로, 각 task는 각각 다른 대기열에 할당될 수 있습니다. 기본 대기열은 airflow.cfg[operator]default_queue에서 지정할 수 있습니다. task에 따로 대기열을 설정하지 않으면 기본 대기열로 전달되고, worker 또한 listen할 대기열을 정하지 않으면 기본 대기열을 listen합니다.

 

worker는 하나 이상의 대기열을 listen할 수 있습니다. worker를 시작할 때

airflow celery worker -q spark,quark

처럼 대기열 이름을 공백 없이 쉼표로 구분하여 여러 대기열을 지정할 수 있습니다. 이렇게 하면 worker는 해당 대기열에 있는 task만 가져옵니다.

 

이는 worker를 특성화해야 할 때 좋습니다. 예를 들어, 하나의 worker가 수천 개의 task를 수행할 정도로 매우 가벼운 task를 다루거나, Spark 클러스터 같은 특정 환경과 보안 인증이 필요한 작업만 수행하는 worker를 준비할 수 있습니다.