본문 바로가기
aiflow

airflow dag의 구조

by kyeongseo.oh 2024. 6. 22.

bashoperator와 pythonoperator를 사용하는 dag를 생성한다. 각 요소에 대한 자세한 설명은 주석을 확인한다.

 

import pendulum
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

tz = pendulum.timezone("Asia/Seoul") # dag 스케쥴링 기준 timezone을 설정합니다.

default_args = {
    'owner': 'kyeongseo.oh', # dag의 관리자를 명시합니다.
    'retries': 1, # 실패 시 1회 재시작 합니다.
    'retry_delay': timedelta(minutes=5) # 재시작 간격을 5분으로 설저합니다.
}

dag = DAG(
    dag_id="chapter_1", # dag id를 설정
    schedule="@once", # dag가 한번만 스케줄되게 설정
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz), # dag가 처음으로 스케쥴되는 시간을 설정
    end_date=pendulum.datetime(2024, 8, 9, 18, 30, tz=tz), # dag가 마지막으로 스케쥴되는 시간을 설정
    tags=["kyeongseo.oh", "chapter_1"], # dag의 tag를 생성
    default_args=default_args, # defualt args 지정
    catchup=False # backfill을 하지 않도록 설정
    )


def hello_world():
    print("hello world")

bash1 = BashOperator(bash_command="echo 'hello task 1'", task_id="hello_bash1", dag=dag)
bash2 = BashOperator(bash_command="echo 'hello task 2'", task_id="hello_bash2", dag=dag)
py = PythonOperator(python_callable=hello_world, task_id="py_hello", dag=dag)

[bash1, bash2] >> py # task 실행 순서 설정

 

 

dag 디렉토리에 python file을 추가하면 airflow ui에 아래와 같이 dag가 등록되는 것을 확인할 수 있다. 왼쪽의 버튼을 클릭해 dag를 스케쥴링할 수 있다.

 

 

dag가 실행되면 graph view에서 dag 구조를 확인할 수 있다.

 

 

task를 클릭한 후 logs 탭을 클릭하면 해당 task의 로그를 확인할 수 있다.

 

 

task 실패 등의 이유로 특정 task를 다시 실행하고자 하는 경우에는 task를 클릭한 후 clear task를 클릭하면 task를 재시작 할 수 있다. 원자성이 보장된 dag를 생성하면 이 기능을 사용해 task가 실패할 경우 dag 전체를 재시작하지 않고, 특정 task만 재시작할 수 있다.

 

 

댓글