bashoperator와 pythonoperator를 사용하는 dag를 생성한다. 각 요소에 대한 자세한 설명은 주석을 확인한다.
import pendulum
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
tz = pendulum.timezone("Asia/Seoul") # dag 스케쥴링 기준 timezone을 설정합니다.
default_args = {
'owner': 'kyeongseo.oh', # dag의 관리자를 명시합니다.
'retries': 1, # 실패 시 1회 재시작 합니다.
'retry_delay': timedelta(minutes=5) # 재시작 간격을 5분으로 설저합니다.
}
dag = DAG(
dag_id="chapter_1", # dag id를 설정
schedule="@once", # dag가 한번만 스케줄되게 설정
start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz), # dag가 처음으로 스케쥴되는 시간을 설정
end_date=pendulum.datetime(2024, 8, 9, 18, 30, tz=tz), # dag가 마지막으로 스케쥴되는 시간을 설정
tags=["kyeongseo.oh", "chapter_1"], # dag의 tag를 생성
default_args=default_args, # defualt args 지정
catchup=False # backfill을 하지 않도록 설정
)
def hello_world():
print("hello world")
bash1 = BashOperator(bash_command="echo 'hello task 1'", task_id="hello_bash1", dag=dag)
bash2 = BashOperator(bash_command="echo 'hello task 2'", task_id="hello_bash2", dag=dag)
py = PythonOperator(python_callable=hello_world, task_id="py_hello", dag=dag)
[bash1, bash2] >> py # task 실행 순서 설정
dag 디렉토리에 python file을 추가하면 airflow ui에 아래와 같이 dag가 등록되는 것을 확인할 수 있다. 왼쪽의 버튼을 클릭해 dag를 스케쥴링할 수 있다.
dag가 실행되면 graph view에서 dag 구조를 확인할 수 있다.
task를 클릭한 후 logs 탭을 클릭하면 해당 task의 로그를 확인할 수 있다.
task 실패 등의 이유로 특정 task를 다시 실행하고자 하는 경우에는 task를 클릭한 후 clear task를 클릭하면 task를 재시작 할 수 있다. 원자성이 보장된 dag를 생성하면 이 기능을 사용해 task가 실패할 경우 dag 전체를 재시작하지 않고, 특정 task만 재시작할 수 있다.
'aiflow' 카테고리의 다른 글
airflow connection과 hook (0) | 2024.08.10 |
---|---|
airflow task context: jinja template, get_current_context (0) | 2024.08.10 |
airflow catchup & backfill (0) | 2024.08.10 |
airflow 실행 날짜 이해: DAG 실행 시간과 execution_date 비교 (0) | 2024.08.09 |
airflow 스케줄링 : preset, cron, 빈도 기반 (0) | 2024.08.09 |
댓글