본문 바로가기
aiflow

airflow 스케줄링 : preset, cron, 빈도 기반

by kyeongseo.oh 2024. 8. 9.

airflow를 스케줄링하는 방법으로는 preset 기반, cron 기반, 빈도 기반 3가지가 있다.

 

1. preset 기반 스케줄링

프리셋은 cron 표현식의 대안으로 사용되며, 더 읽기 쉽고 이해하기 쉽다. 주요 프리셋들과 그 의미는 아래와 같다.

이름 의미 비고
None 스케줄링하지 않고, 외부에서
트리거될 때만 실행한다.
 
@once 1번만 실행한다.  
@continuous 이전 실행이 끝나자마자 다시 실행한다. max_active_runs=1 일때만 사용 가능
@hourly 매 시 정각에 한번씩 실행한다.  
@daily 매일 자정에 1회 실행한다.  
@weekly 매주 일요일 자정에 1회 실행한다.  
@monthly 매월 1일 정각에 한번 실행한다.  
@quarterly 분기별로 1회 첫날 자정에 실행한다.  
@yearly 매년 1월 1일 자정에 1회 실행한다.  

 

프리셋 기반 스케줄링 예제

import pendulum
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

tz = pendulum.timezone("Asia/Seoul") # dag 스케쥴링 기준 timezone을 설정합니다.

default_args = {
    'owner': 'kyeongseo.oh', # dag의 관리자를 명시합니다.
    'retries': 1, # 실패 시 1회 재시작 합니다.
    'retry_delay': timedelta(minutes=5), # 재시작 간격을 5분으로 설저합니다.
}

dag = DAG(
    dag_id="chapter_1", # dag id를 설정합니다.
    schedule="@continuous", # 이전 실행이 끝나자마자 바로 다시 실행한다.
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz), # dag가 처음으로 스케쥴되는 시간을 설정합니다.
    tags=["kyeongseo.oh", "continuous"], # dag의 tag를 생성합니다.
    catchup=False, # backfill을 하지 않도록 설정합니다.
    max_active_runs = 1 # @continuous을 사용할 때는 chapter_1 dag가 동시에 여러개 실행되지 않도록 1로 설정해야합니다.
    )


def hello_world():
    print("hello world")

bash1 = BashOperator(bash_command="echo 'hello task 1'", task_id="hello_bash1", dag=dag)
bash2 = BashOperator(bash_command="echo 'hello task 2'", task_id="hello_bash2", dag=dag)
py = PythonOperator(python_callable=hello_world, task_id="py_hello", dag=dag)

[bash1, bash2] >> py

 

 

2. cron 기반 스케줄링

일반적으로 사용하는 cron 기반 스케줄링과 동일하다.

 

cron에서 각 요소가 허용하는 값은 아래와 같다.

시간 요일
0-59 0-23 1-31 1-12 또는 JAN-DEC 0-6 또는 SUN-SAT

 

cron에서 허용하는 특수문자와 그 의미는 아래와 같다.

* 모든 값 * * * * * (매분)
, 값 목록 1,15 * * * * (매시간 1분과 15분)
- 범위 0-5 * * * * (매시간 정각부터 5분까지 매분 실행)
/ 단계 값 */15 * * * * (15분마다)

 

 

cron example

0 0 * * * 매일 자정
*/5 * * * * 5분마다
0 20 * * 1-5 평일 오후 8시
0 0 1,15 * * 매월 1일과 15일 자정
0 9-17 * * * 매일 오전 9시부터 오후 5시까지 매시간

 

cron 기반 스케줄링 예제

import pendulum
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

tz = pendulum.timezone("Asia/Seoul") # dag 스케쥴링 기준 timezone을 설정합니다.

default_args = {
    'owner': 'kyeongseo.oh', # dag의 관리자를 명시합니다.
    'retries': 1, # 실패 시 1회 재시작 합니다.
    'retry_delay': timedelta(minutes=5), # 재시작 간격을 5분으로 설저합니다.
}

dag = DAG(
    dag_id="chapter_1", # dag id를 설정합니다.
    schedule="*/5 * * * *", # 5분에 한번씩 실행되도록 설정
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz), # dag가 처음으로 스케쥴되는 시간을 설정합니다.
    tags=["kyeongseo.oh", "every_5_minutes"], # dag의 tag를 생성합니다.
    catchup=False, # backfill을 하지 않도록 설정합니다.
    )


def hello_world():
    print("hello world")

bash1 = BashOperator(bash_command="echo 'hello task 1'", task_id="hello_bash1", dag=dag)
bash2 = BashOperator(bash_command="echo 'hello task 2'", task_id="hello_bash2", dag=dag)
py = PythonOperator(python_callable=hello_world, task_id="py_hello", dag=dag)

[bash1, bash2] >> py

 

3. 빈도 기반 스케줄링

python의 timedelta를 사용해 빈도 기반으로 스케줄링 할 수 있다.

5분마다 timedelta(minutes=5) DAG가 5분 간격으로 실행됩니다.
매시간 timedelta(hours=1) DAG가 1시간 간격으로 실행됩니다.
6시간마다 timedelta(hours=6) DAG가 6시간 간격으로 실행됩니다.
3일마다 timedelta(days=3) DAG가 3일 간격으로 실행됩니다.
매주 timedelta(weeks=1) DAG가 1주일 간격으로 실행됩니다.

 

빈도 기반 스케줄링 예제

import pendulum
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

tz = pendulum.timezone("Asia/Seoul") # dag 스케쥴링 기준 timezone을 설정합니다.

default_args = {
    'owner': 'kyeongseo.oh', # dag의 관리자를 명시합니다.
    'retries': 1, # 실패 시 1회 재시작 합니다.
    'retry_delay': timedelta(minutes=5), # 재시작 간격을 5분으로 설저합니다.
}

dag = DAG(
    dag_id="chapter_1", # dag id를 설정합니다.
    schedule=timedelta(minutes=1), # 1분에 한번씩 실행되도록 설정
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz), # dag가 처음으로 스케쥴되는 시간을 설정합니다.
    tags=["kyeongseo.oh", "every_5_minutes"], # dag의 tag를 생성합니다.
    catchup=False, # backfill을 하지 않도록 설정합니다.
    )


def hello_world():
    print("hello world")

bash1 = BashOperator(bash_command="echo 'hello task 1'", task_id="hello_bash1", dag=dag)
bash2 = BashOperator(bash_command="echo 'hello task 2'", task_id="hello_bash2", dag=dag)
py = PythonOperator(python_callable=hello_world, task_id="py_hello", dag=dag)

[bash1, bash2] >> py

댓글