관리 메뉴

CS

Airflow: Logging 본문

Airflow

Airflow: Logging

vcxz 2023. 10. 7. 00:07

Airflow Logging for Tasks

 

Logging for Tasks — Airflow Documentation

 

airflow.apache.org

Astronomer Airflow logging

 

Airflow logging | Astronomer Documentation

An introduction to Airflow logging.

docs.astronomer.io

 


Logging

Airflow는 기본적으로 task 로그를 남기며, 이를 Airflow UI에서 각 task별로 확인할 수 있습니다. Airflow는 기본적으로 task 로그를 파일로 남기는 FileTaskHandler 인터페이스를 제공하는데, 이 handler는 task가 실행되고 있는 worker에서 로그를 가져올 수 있습니다. 또한 Airflow 커뮤니티는 S3 같은 외부 저장소에도 로그를 전달할 수 있도록 여러 프로바이더 패키지를 제공합니다.

기본 개념

Airflow는 파이썬의 logging 모듈을 사용합니다. logging 모듈은 다음 클래스들을 포함합니다.

■ Logger: 코드가 직접적으로 상호작용하는 대상으로, Airflow는 root, flask_appbuilder, airflow.processor, airflow.task 총 4개의 logger를 정의하고 있습니다.

■ Handler: 로그 기록을 특정 위치로 전달합니다. Airflow는 기본적으로 RedirectStdHandler, FileProcessorHandler, FileTaskHandler를 사용합니다.

■ Filter: 어떤 로그 기록이 제거될 것인지 정합니다. Airflow는 SecretsMasker를 사용해 민감한 정보가 출력되지 않도록 합니다.

■ Formatter: 로그 기록의 형식을 정합니다. Airflow는 다음 두 formatter를 사용합니다.

# airflow_colored
"[%(blue)s%(asctime)s%(reset)s] {%(blue)s%(filename)s:%(reset)s%(lineno)d} %(log_color)s%(levelname)s%(reset)s - %(log_color)s%(message)s%(reset)s"

# airflow
"[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"

로그 형식은 airflow.cfglog_format에서 변경할 수 있습니다.

 

logging 모듈은 총 6개(CRITICAL, ERROR, WARNING, INFO, DEBUG, NOTSET)의 로그 레벨을 갖습니다. CRITICAL이 가장 강력하며 DEBUG가 가장 약합니다. logger에 로그 레벨을 정하면 해당 logger는 정해진 레벨보다 약한 로그는 전달하지 않습니다.

 

Airflow의 네 가지 기본 logger는 각각 정해진 위치와 formatter를 가진 handler를 갖습니다.

- root (INFO 레벨): RedirectStdHandlerairflow_colored를 사용합니다. sys.stderr/stdout에 출력하며, 특정 logger가 정해져 있지 않으면 기본 만능 logger로서 행동합니다.

- flask_appbuilder (WARNING): RedirectStdHandlerairflow_colored를 사용합니다. webserver의 로그를 sys.stderr/stdout에 출력합니다.

- airflow.processor (INFO): FileProcessorHandlerairflow를 사용합니다. scheduler의 로그를 파일 시스템에 전달합니다.

- airflow.task (INFO): FileTaskHandlerairflow를 사용합니다. task의 로그를 파일 시스템에 전달합니다.

로그 레벨은 airflow.cfglogging_level 항목들을 통해 변경할 수 있습니다.

구성

Airflow 로그 설정은 airflow.cfg[logging] 탭에서 할 수 있습니다.

 

base_log_folder를 통해 기본 handler인 FileTaskHandler가 로그 파일을 어떤 디렉토리에 저장할 것인지 구성할 수 있습니다. 기본 값은 AIRFLOW_HOME을 가리킵니다.

 

log_filename_template에서 로그 파일 이름 형식을 지정할 수 있습니다. 기본 형식은 다음과 같습니다.

- 보통 task의 경우: dag_id={dag_id}/run_id={run_id}/task_id={task_id}/attempt={try_number}.log

- 동적 매핑 task의 경우: dag_id={dag_id}/run_id={run_id}/task_id={task_id}/map_index={map_index}/attempt={try_number}.log

이는 airflow.cfglog_filename_template로 변경할 수 있습니다.

 

어떤 task handler가 설정되어 있는지 확인하려면 airflow info 명령어를 쓰면 됩니다.

$ airflow info

Apache Airflow
version                | 2.7.0
executor               | LocalExecutor
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn       | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder            | /files/dags
plugins_folder         | /root/airflow/plugins
base_log_folder        | /root/airflow/logs
remote_base_log_folder |

[skipping the remaining outputs for brevity]

Airflow에서 기본 로그 구성(DEFAULT_LOGGING_CONFIG)을 어떻게 하고 있는지는 소스 코드에서 확인할 수 있습니다.

로그 출력

로컬에서 Airflow를 실행하면 다음 위치로 로그가 출력됩니다.

■ Scheduler: 콘솔에 로그가 출력되며, $AIRFLOW_HOME/logs/scheduler에 저장됨

■ Webserver 및 Triggerer: 콘솔에 로그가 출력되며, triggerer의 경우 결부된 task의 로그에서도 찾을 수 있음

■ Task: Airflow UI에 로그가 출력되며, $AIRFLOW_HOME/logs에 저장됨

코드로 task 로그 기록

Airflow는 파이썬의 logging을 사용하며, root logger가 task 로그를 기록하도록 설정하였습니다.

대부분의 오퍼레이터는 자동적으로 task 로그를 씁니다. 모든 오퍼레이터는 LoggingMixin이라는 클래스를 상속받는데, 여기에 logger를 생성하는 코드가 있습니다.

 

따라서 다음 방법들로 코드에서 task 로그를 쓸 수 있습니다.

- 오퍼레이터.log로 logger를 얻어 로그 작성

- print()로 stdout에 출력(추천하지 않으나 유용할 수 있음)

- 파이썬 logging을 사용해 logger를 생성하여 로그 작성

# 기존 airflow.task를 사용해 로그 작성하기
import logging

task_logger = logging.getLogger("airflow.task")
# airflow.task는 task 로그만 작성하므로, task 함수 밖에 있는 경우 기록되지 않음

worker와 triggerer에서 로그 전송

대부분의 task handler는 task가 완료되어야 로그를 전송합니다.

따라서 Airflow는 다음 경우 실시간으로 로그를 제공하기 위해 HTTP 서버로 로그를 전달합니다.

- SequentialExecutor 또는 LocalExecutor 사용 시, scheduler가 작동 중일 때

- CeleryExecutor 사용 시, worker가 작동 중일 때

triggerer에서는 --skip-serve-logs 옵션을 사용하지 않았다면 로그를 전송합니다.

 

worker에서 로그를 전달할 때 사용하는 포트는 airflow.cfgworker_log_server_port에서, triggerer의 경우 triggerer_log_server_port에서 변경할 수 있습니다. 기본 값은 worker는 8793, triggerer는 8794입니다.

webserver와 worker 간 연결은 airflow.cfgsecret_key를 사용합니다.

외부 저장소에 로그 전송

S3 같은 외부 저장소에 로그를 전달하기 위해서는 관련 provider 패키지를 설치하고 UI에서 connection을 생성한 뒤 airflow.cfg를 적절히 변경하면 됩니다.

다음은 S3에 task 로그를 전달하는 예시입니다.

1. apache-airflow-providers-amazon 설치

2. Airflow UI의 Admin > Connections에서, login을 액세스 키로 password를 비밀 액세스 키로 하여 connection을 생성

3. airflow.cfg에서,

  • remote_logging=True
  • remote_log_conn_id=${2에서 생성한 connection}
  • remote_base_log_folder=s3://${S3 버킷 이름}/${경로}
  • <옵션> delete_local_logs=True: 로컬에 남은 로그는 삭제
  • <옵션> encrypt_s3_logs=True: S3의 서버 측 암호화 사용

이렇게 설정하면 Airflow는 provider 패키지에 포함된 S3TaskHandler로 FileTaskHandler를 대체합니다.

Blob 저장소에 전송 시 유의 사항

Airflow의 외부 task handler는 스트리밍 handler (엘라스틱서치, AWS Cloudwatch 등)와 blob 저장소 handler (S3, GCS 등)으로 구분할 수 있습니다.

이중 blob 저장소 handler의 경우, 제대로 보관하기 위해 타임스탬프를 파싱합니다. 만약 logging에 자작 formatter를 사용하고 있다면 airflow.cfg의 interleave_timestamp_parser에 이에 대한 parser를 제공해야 합니다.

 

 

 

'Airflow' 카테고리의 다른 글

Airflow: 동적 DAG 생성  (0) 2023.10.16
Airflow: Best Practice 1/3 - DAG 작성  (0) 2023.10.12
Airflow: Timetable  (0) 2023.09.29
Airflow: 데이터에 의한 스케줄링  (0) 2023.09.26
Airflow: Time Zone  (0) 2023.09.25