관리 메뉴

CS

Airflow: 데이터에 의한 스케줄링 본문

Airflow

Airflow: 데이터에 의한 스케줄링

vcxz 2023. 9. 26. 04:12

Airflow Data-aware scheduling

 

Data-aware scheduling — Airflow Documentation

 

airflow.apache.org

Astronomer Datasets and data-aware scheduling in Airflow

 

Datasets and data-aware scheduling in Airflow | Astronomer Documentation

Using datasets to implement DAG dependencies and scheduling in Airflow.

docs.astronomer.io


데이터에 의한 스케줄링

DAG를 시간에 기반해 스케줄할 수도 있지만, dataset을 업데이트하는 task에 따라 스케줄할 수도 있습니다.

from airflow.datasets import Dataset

# 기존 오퍼레이터
with DAG(...):
    # producer task
    MyOperator(
        outlets=[Dataset("s3://dataset-bucket/example.csv")],
        ...,
    )

# consumer DAG
with DAG(
    schedule=[Dataset("s3://dataset-bucket/example.csv")],
    ...,
):
    ...
    
## TaskFlow
@dag(...)
def my_dag():
    # producer task
    @task(outlets=[Dataset("s3://dataset-bucket/example.csv")])
    def producer():
        ...

# consumer DAG
@dag(
    schedule=[Dataset("s3://dataset-bucket/example.csv")],
    ...,
)
def consumer():
    ...

Dataset scheduling DAG의 Web UI 표시 예시

dataset 스케줄링은 worker를 차지하지 않기 때문에, sensor나 다른 DAG 의존성 구현 방식보다 나을 수 있습니다.

 

Dataset?

Airflow의 dataset은 논리적으로 그룹핑한 데이터를 뜻합니다. 딱히 파일 종류에 대한 제한은 없는 것으로 보이며, 텍스트 파일이나 CSV 파일 등을 dataset으로 사용할 수 있습니다.

 

dataset은 "producer" task에 의해 업데이트되고, dataset이 업데이트되면 "consumer" DAG가 트리거됩니다. 즉, producer task가 upstream, consumer DAG가 downstream이 되는 DAG 의존성이 생성된 것입니다.

※ producer task가 dataset 대상 데이터를 건들지 않고, consumer DAG가 dataset 대상 데이터를 사용하지 않더라도 dataset 스케줄링은 작동합니다. 따라서 DAG 의존성을 만드는 측면에서만 사용해도 무방합니다.

 

dataset은 URI로 정의할 수 있습니다.

from airflow.datasets import Dataset

example_dataset = Dataset("s3://dataset-bucket/example.csv")

URI는 문자열로만 인식되므로, 정규표현식이나 glob 패턴은 사용할 수 없습니다. 그리고 대소문자를 구분합니다.

 

dataset URI에는 두 가지 규칙이 있습니다.

1. ASCII 문자로만 이루어져야 합니다.

2. scheme으로 airflow 사용 불가: 예약어이기 때문입니다.

# 유효하지 않은 dataset:
not_ascii = Dataset("èxample_datašet")
reserved = Dataset("airflow://example_dataset")

dataset URI는 절대 URI일 필요는 없고, scheme-less이어도, 상대 URI이어도, 간단한 경로나 문자열이어도 됩니다.

# 유효한 dataset:
schemeless = Dataset("//example/dataset")
csv_file = Dataset("example_dataset")

dataset에 extra dict를 포함해도 됩니다.

example_dataset = Dataset(
    "s3://dataset/example.csv",
    extra={"team": "trainees"},
)

※ dataset URI와 extra 필드는 암호화되지 않고 일반 텍스트로 Airflow 데이터베이스에 저장됩니다. 따라서 보안 증명 같은 민감한 정보는 피해야 합니다.

 

 

DAG에서 dataset 사용하기

DAG에서 데이터 의존성을 명시하기 위해 dataset을 사용할 수 있습니다.

example_dataset = Dataset("s3://dataset/example.csv")

with DAG(dag_id="producer", ...):
    BashOperator(task_id="producer", outlets=[example_dataset], ...)

with DAG(dag_id="consumer", schedule=[example_dataset], ...):
    ...

producer DAG에 있는 producer task가 성공하면, Airflow는 consumer DAG를 스케줄합니다. dataset은 producer task가 성공하였을 때만 업데이트 상태가 되므로, 해당 task가 실패하거나 스킵되었을 때는 업데이트 상태가 되지 않아 consumer DAG를 스케줄하지 않습니다.

※ 유의사항

- DAG의 schedule 인수는 dataset을 받거나 시간 기반 옵션을 받거나 둘 중 하나만 받을 수 있습니다. 둘을 함께 사용할 수는 없습니다.

- dataset 스케줄링은 동일한 Airflow에 존재하는 DAG 사이에서만 가능합니다.

- Airflow는 DAG 및 task 관계에서만 dataset을 보기 때문에, dataset의 원 데이터가 Airflow 외의 원인으로 변경되어도 dataset 스케줄링에 영향을 주지 않습니다.

- consumer DAG는 producer task가 성공하는 순간 스케줄됩니다. 따라서 producer task를 포함한 DAG가 아직 끝나지 않았어도 실행될 수 있으며, 해당 DAG 내에 동일한 dataset에 대한 producer task가 또 있어도 마찬가지입니다.

 

 

여러 dataset을 함께 사용하기

DAG의 schedule 인수는 list를 받을 수 있기 때문에, 여러 dataset을 함께 받을 수 있습니다. 이때 기재된 모든 dataset이 해당 DAG의 지난 번 실행 이후로 최소 한 번은 업데이트되어야 DAG를 스케줄합니다.

with DAG(
    dag_id="multiple_datasets_example",
    schedule=[
        example_dataset_1,
        example_dataset_2,
        example_dataset_3,
    ],
    ...,
):
    ...

 

 

 

UI에서의 dataset

두 dataset을 기다리는 DAG에서 한 dataset만 업데이트된 경우
"Datasets" 탭
"Browse" 탭의 "DAG Dependencies"

 

 

DAG 예시

producer task

from pendulum import datetime
from airflow import Dataset
from airflow.decorators import dag, task

API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Dataset("file://localhost/home/airflow/airflow/dags/files/cocktail_instructions.txt")
INFO = Dataset("file://localhost/home/airflow/airflow/dags/files/cocktail_info.txt")

@dag(
    start_date=datetime(2022, 10, 1),
    schedule=None,
    catchup=False,
)
def datasets_producer_dag():
    # 외부에서 데이터를 가져오는 task
    @task
    def get_cocktail(api):
        import requests

        r = requests.get(api)
        return r.json()

    # "INSTRUCTIONS" dataset에 대한 task
    @task(outlets=[INSTRUCTIONS])
    def write_instructions_to_file(response):
        cocktail_name = response["drinks"][0]["strDrink"]
        cocktail_instructions = response["drinks"][0]["strInstructions"]
        msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"

        f = open("/home/airflow/airflow/dags/files/cocktail_instructions.txt", "a")
        f.write(msg)
        f.close()

    # "INFO" dataset에 대한 task
    @task(outlets=[INFO])
    def write_info_to_file(response):
        import time

        time.sleep(30)
        cocktail_name = response["drinks"][0]["strDrink"]
        cocktail_category = response["drinks"][0]["strCategory"]
        alcohol = response["drinks"][0]["strAlcoholic"]
        msg = f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
        f = open("/home/airflow/airflow/dags/files/cocktail_info.txt", "a")
        f.write(msg)
        f.close()

    cocktail = get_cocktail(api=API)
    write_instructions_to_file(cocktail)
    write_info_to_file(cocktail)

datasets_producer_dag()

consumer DAG

from pendulum import datetime
from airflow import Dataset
from airflow.decorators import dag, task

INSTRUCTIONS = Dataset("file://localhost/home/airflow/airflow/dags/files/cocktail_instructions.txt")
INFO = Dataset("file://localhost/home/airflow/airflow/dags/files/cocktail_info.txt")

@dag(
    dag_id="datasets_consumer_dag",
    start_date=datetime(2022, 10, 1),
    schedule=[INSTRUCTIONS, INFO],  # 두 dataset으로 스케줄
    catchup=False,
)
def datasets_consumer_dag():
    @task
    def read_about_cocktail():
        cocktail = []
        for filename in ("info", "instructions"):
            with open(f"home/airflow/airflow/dags/files/cocktail_{filename}.txt", "r") as f:
                contents = f.readlines()
                cocktail.append(contents)

        return [item for sublist in cocktail for item in sublist]

    read_about_cocktail()

datasets_consumer_dag()

Datasets
DAG Dependencies

 

 

 

'Airflow' 카테고리의 다른 글

Airflow: Logging  (0) 2023.10.07
Airflow: Timetable  (0) 2023.09.29
Airflow: Time Zone  (0) 2023.09.25
Airflow: 동적 Task 매핑  (0) 2023.09.22
Airflow: Param (파라미터)  (0) 2023.09.19