일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- lambda
- boto3
- executor
- XCOM
- Data Firehose
- github actions
- Git
- S3
- dagrun
- DAG
- Scheduling
- authoring
- kinesis
- pipeline
- Operator
- RDS
- dynamic task mapping
- testing
- TaskFlow
- Concept
- sqs
- git book
- AWS
- airflow
- credentials
- mysql
- Task
- celery
- ci/cd
- SecretsManager
- Today
- Total
CS
Airflow: 데이터를 데이터베이스에 적재하고 주기적으로 갱신하는 파이프라인 만들기 본문
Airflow 사이트의 Building a Running Pipeline 문서를 따라
인터넷에 있는 데이터를 받아 로컬 데이터베이스에 입력하는 파이프라인을 만드는 과정을 알아보겠습니다.
MySQL 설치 및 여러 설정
위 문서에서는 docker에서 airflow를 실행하였는데, docker는 복잡하다고 생각해서 docker 없이 진행하였습니다.
또한 해당 문서에서는 docker에 포함된 postgreSQL을 썼는데, 저는 MySQL이 더 익숙하기 때문에 우선 MySQL을 설치하겠습니다.
# MySQL 설치
sudo apt-get install mysql-server
# MySQL 실행
sudo systemctl start mysql
# Airflow MySQL 패키지 설치
pip install apache-airflow-providers-mysql
----------
## mysql 설치 시
# dpkg: error processing package mysql-server (--configure):
# 발생하는 경우
# 설치 중 오류가 발생한 상태 그대로
# /etc/mysql/conf.d/ 또는
# /etc/mysql/mysql.conf.d/ 에 있는 cnf 파일에서
# bind-address = 127.0.0.1 찾기
# 해당 부분 주석 처리 후
# skip-networking 기재하고 저장
# 네트워킹을 생략하고 일단 사용 가능
# WSL 환경인 만큼 IP 관련 문제인 듯
이후 airflow에서 MySQL에 접속할 수 있도록 MySQL에 사용자를 만듭니다.
그리고 airflow용 데이터베이스를 생성한 후 만든 사용자에게 해당 데이터베이스에 대한 권한을 부여합니다.
### MySQL 접속
# 비밀번호를 물으면 그냥 enter 입력
mysql -u root -p
----------
## access denied 되는 경우
# root 계정은 기본적으로 auth_socket이라는 인증 방식을 사용하는데
# 어떤 경우 이 방식으로 로그인이 되지 않는 것 같음
# 따라서 다른 계정으로 로그인 후 root 계정의 인증 방식과 비밀번호를 변경하여 사용
# 기본적으로 존재하는 계정 확인
cat /etc/mysql/debian.cnf
# user(debian-sys-maint)와 password 확인
# 해당 계정으로 로그인
mysql -u debian-sys-maint -p
# 위에서 확인한 비밀번호 입력
# mysql 로그인 후 root 계정의 인증 방식과 비밀번호 변경
alter user 'root'@'localhost' identified with mysql_native_password by '비밀번호';
### mysql 사용자 및 데이터베이스 생성
# 이름: airflow, 비밀번호: airflow인 사용자 생성
# localhost를 붙이면 localhost에서만 접속 가능
create user airflow@localhost identified by 'airflow';
# airflow용 데이터베이스 airflow 생성
create database airflow;
# 사용자 airflow에게 데이터베이스 airflow에 대한 모든 권한 부여
grant all privileges on airflow.* to airflow@localhost;
# 이후 사용할 LOAD DATA 문을 위해 LOCAL INFILE 허용
set global local_infile=1;
사용자 이름이나 비밀번호, 데이터베이스명은 마음대로 설정해도 됩니다.
그리고 airflow가 앞으로 데이터베이스로 MySQL을 사용하도록 설정합니다.
## airflow.cfg
[database]
# 데이터베이스 URL 기재
sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow
MySQL의 기본 포트는 3306입니다.
airflow의 데이터베이스를 변경하는 자세한 내용은 Set up a Database Backend 문서를 참고하세요.
# airflow가 데이터베이스를 인식하도록 init
airflow db init
그리고 airflow를 실행하여 MySQL에 대한 connection을 만듭니다.
# 스탠드얼론 모드로 airflow 실행
airflow standalone
airflow UI (http://localhost:8080/)에 들어가서 'Admin'의 'Connections'에 들어갑니다.
더하기 모양 버튼을 눌러 connection을 새로 만듭니다.
connection id는 마음대로 설정해도 되고, 나머지는 위에서 설정한 대로 기재합니다.
여기서 만든 connection id를 통해 나중에 스크립트에서 MySQL에 접속할 것입니다.
DAG 작성
기본적으로 Building a Running Pipeline 문서에 있는 스크립트를 따르되 postgreSQL 부분을 MySQL로 변경하였습니다.
작업 내용은 인터넷에 있는 CSV 파일 내용을 데이터베이스의 1차 테이블에 넣고, 이전 데이터가 저장된 2차 테이블과 비교하여 변경된 것만 수정하는 주기적인 데이터베이스 업데이트입니다.
@dag(
dag_id="process-employees",
schedule="0 12 * * *",
start_date=pendulum.datetime(2023, 7, 31, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
def ProcessEmployees():
TaskFlow API로 'process-employees'라는 DAG를 만듭니다.
pendulum은 쉽게 날짜를 생성할 수 있는 모듈입니다. 문서를 따르면 Airflow에서 datetime을 다룰 때는 pendulum을 쓰는 게 좋을 거라고 합니다.
start_date를 최근 날짜로 정할 경우 기본 시간대인 UTC가 우리나라 시간대인 KST보다 9시간 느리다는 것을 유념하길 바랍니다. start_date가 dag가 실행되는 시간인 logical_date보다 미래면 dag가 성공하나 아무 작업도 수행되지 않는데, 시간대가 헷갈려서 문제가 발생할 수 있습니다.
create_employees_table = SQLExecuteQueryOperator(
task_id="create_employees_table",
conn_id="mysql_conn_id",
sql="sql/employees_schema.sql",
)
create_employees_temp_table = SQLExecuteQueryOperator(
task_id="create_employees_temp_table",
conn_id="mysql_conn_id",
sql="sql/employees_temp_schema.sql",
)
가장 먼저 수행하는 task로, 데이터베이스에 1차 테이블(employees_temp)과 2차 테이블(employees)을 만듭니다.
Airflow는 Airflow를 사용하는 일반적인 상황들에 바로 쓸 수 있게 미리 정의된 오퍼레이터들을 제공하고 있습니다. 이런 오퍼레이터들은 필수적인 인자만 넣어 쉽게 사용할 수 있습니다.
저는 SQL을 실행하기 위한 오퍼레이터 중 SQLExecuteQueryOperator를 사용하였습니다. SQLExecuteQueryOperator는 common-sql 패키지에 있는 오퍼레이터로, 데이터베이스 종류에 관계 없이 사용할 수 있어 편리합니다.
- conn_id: 사용할 connection id입니다. 위 airflow UI에서 추가한 것을 사용합니다.
- sql: 실행할 SQL 쿼리입니다. 직접 SQL 문을 작성해도 되고 예시처럼 sql 파일 경로를 입력해도 됩니다. 경로는 스크립트 파일 위치를 기준으로 상대 경로를 적어야 합니다.
# sql/employees_schema.sql
CREATE TABLE IF NOT EXISTS employees (
`serial_number` INT PRIMARY KEY,
`company_name` VARCHAR(50),
`employee_markme` VARCHAR(50),
`description` VARCHAR(50),
`leave` INT
);
# sql/employees_temp_schema.sql
DROP TABLE IF EXISTS employees_temp;
CREATE TABLE employees_temp (
`serial_number` INT PRIMARY KEY,
`company_name` VARCHAR(50),
`employee_markme` VARCHAR(50),
`description` VARCHAR(50),
`leave` INT
);
실행할 SQL 문입니다.
2차 테이블인 employees는 없으면 만들고, 1차 테이블인 employees_temp는 있으면 삭제하고 새로 만듭니다. 여기서 2차 테이블은 계속 유지하는 용도고 1차 테이블은 쓸 때마다 초기화해야 하기 때문입니다.
SQL에서 컬럼명 같은 직접 입력하는 문자열은 웬만하면 인용 부호로 묶어주는 게 좋습니다. 키워드랑 겹치면 오류가 납니다. 데이터베이스 종류마다 사용할 수 있는 인용부가 다른 것도 주의해야 합니다.
@task
def get_data():
data_path = os.path.dirname(os.path.abspath(__file__)) + "/files/employees.csv"
os.makedirs(os.path.dirname(data_path), exist_ok=True)
url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"
response = requests.request("GET", url)
with open(data_path, "w") as file:
file.write(response.text)
두 번째로 수행할 task로, 인터넷에 있는 CSV 파일의 내용을 얻어 컴퓨터에 CSV 파일로 저장한 후, 이를 데이터베이스에 입력합니다.
저는 CSV 파일을 스크립트 파일 위치에 있는 files 폴더에 저장하려 했기 때문에, os.path 모듈을 사용하여 스크립트 파일의 절대 경로를 얻은 후 뒤에 추가 경로를 붙였습니다. 그리고 해당 경로가 존재하지 않으면 만들도록 합니다.
그리고 requests 모듈을 통해 해당 페이지의 텍스트를 얻어 위에서 만든 경로에 파일로 작성합니다.
query = f"""
LOAD DATA LOCAL INFILE '/home/airflow/airflow/dags/files/employees.csv'
INTO TABLE employees_temp
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
IGNORE 1 LINES;
"""
각 데이터베이스 종류마다 파일 내용을 테이블에 바로 입력할 수 있는 방법이 있는데, MySQL에서는 LOAD DATA 문을 쓸 수 있습니다. LOAD DATA 문은 내부적으로 그냥 INSERT하는 것과 다르게 작동하여 훨씬 빠르게 데이터를 입력할 수 있다고 합니다.
파일의 컬럼과 테이블의 컬럼 순서와 개수가 같기 때문에 컬럼을 적을 필요는 없었습니다.
- LOCAL: 클라이언트 측에 있는 파일을 사용하는 키워드입니다. 빼면 서버 측에 있는 파일을 부릅니다. 여기서는 airflow와 MySQL이 같은 컴퓨터에서 실행되기 때문에 헷갈렸는데, 밑에서 볼 hook의 파라미터도 그렇고 LOCAL을 붙이는 게 맞는 것 같습니다. LOCAL을 빼면 오류가 났습니다.
- 파일 경로는 절대 경로로 쓰는 게 좋을 것 같습니다. 상대 경로는 MySQL이 있는 위치에서 찾는 것 같습니다.
- FIELDS TERMINATED BY: 필드(컬럼) 구분자를 씁니다. 예시 파일은 쉼표(',')로 구분되어 있습니다.
- LINES TERMINATED BY: 줄(로우) 구분자를 씁니다. 예시 파일은 줄바꿈('\n')이 있습니다.
- IGNORE 1 LINES: 위에서부터 몇 줄을 무시할 수 있습니다. 예시 파일의 첫 줄은 컬럼명이 있는 헤더이기 때문에 사용하였습니다.
try:
mysql_hook = MySqlHook(schema='airflow', mysql_conn_id="mysql_conn_id", local_infile=True)
conn = mysql_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
conn.commit()
return 0
except Exception as e:
raise AirflowException("Exception Occurs:", e)
MySqlHook을 사용하여 connection 객체를 얻은 후 커서를 통해 쿼리를 실행합니다.
- schema: 접근할 데이터베이스 스키마입니다.
- mysql_conn_id: 사용할 connection id입니다. 참고로 현재(2023년 8월 2일) airflow 문서에서는 파라미터가 connection이라는 이름으로 되어 있는 것 같은데, connection으로 적으면 오류가 났습니다.
- local_infile: LOAD DATA LOCAL INFILE 문을 사용하려면 True를 명시해야 합니다. 기본 값은 False입니다. LOCAL INFILE을 쓰려면 서버와 클라이언트 두 측에서 모두 허용을 해야 하기 때문에, MySQL에서도 허용하고 여기서도 허용하는 것 같습니다.
문서에서는 except에 return 1만 기재하였는데, 이 경우 오류가 나도 그냥 1이 반환될 뿐 실패 처리가 되지 않기 때문에, AirflowException 객체를 사용하여 예외 발생 시 task가 실패하도록 하였습니다.
@task
def merge_data():
query = """
INSERT INTO employees
SELECT DISTINCT *
FROM employees_temp t
ON DUPLICATE KEY
UPDATE
`employee_markme` = t.employee_markme,
`description` = t.description,
`leave` = t.leave;
"""
마지막으로 실행할 task로, 1차 테이블(employees_temp)의 레코드를 2차 테이블(employees)에 입력하되, 기본 키(여기서는 serial_number)가 동일한 레코드가 이미 존재하면 특정 열(여기서는 employee_markme, description, leave)만 갱신합니다.
- DISTINCT: 중복 레코드가 혹시 있을지 모르기 때문에 적은 것 같습니다.
- ON DUPLICATE KEY: key가 중복되는 경우에는 아래 작업을 수행하라는 키워드입니다.
try:
mysql_hook = MySqlHook(schema='airflow', mysql_conn_id="mysql_conn_id")
conn = mysql_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
conn.commit()
return 0
except Exception as e:
raise AirflowException("Exception Occurs:", e)
마찬가지로 MySqlHook을 사용하여 쿼리를 실행합니다.
[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
dag = ProcessEmployees()
마지막으로 task 간 종속성을 결정합니다. 여러 task를 대괄호('[]')로 묶으면 같은 레벨에서 실행할 수 있습니다.
그리고 함수를 스크립트의 top level에서 실행하여 Airflow가 DAG를 인식할 수 있도록 합니다.
dag 스크립트 파일을 dags 폴더에 위치시킨 후 airflow가 dag를 다시 검색하는 시간이 지나면 airflow UI에서 해당 dag를 찾을 수 있습니다. 그리고 실행 버튼을 눌러 수동적으로 실행할 수 있습니다.
만약 실패한 경우 task별 log를 참고할 수 있습니다.
실제로 데이터가 저장되었는지 확인하려면, 터미널에서 MySQL을 실행하여 간단히 확인해보거나, MySQL Workbench 같은 프로그램을 통해 MySQL에 접속하여 볼 수 있습니다.
'Airflow' 카테고리의 다른 글
Airflow: DAG (0) | 2023.08.21 |
---|---|
Airflow: 구조 개요 (0) | 2023.08.17 |
Airflow: TaskFlow API의 여러 기능 (0) | 2023.07.23 |
Airflow: TaskFlow API 패러다임으로 DAG 정의 (0) | 2023.07.22 |
Airflow: DAG 형식 살펴보기 (0) | 2023.06.27 |