airflow를 스케줄링하는 방법으로는 preset 기반, cron 기반, 빈도 기반 3가지가 있다.
1. preset 기반 스케줄링
프리셋은 cron 표현식의 대안으로 사용되며, 더 읽기 쉽고 이해하기 쉽다. 주요 프리셋들과 그 의미는 아래와 같다.
이름 | 의미 | 비고 |
None | 스케줄링하지 않고, 외부에서 트리거될 때만 실행한다. |
|
@once | 1번만 실행한다. | |
@continuous | 이전 실행이 끝나자마자 다시 실행한다. | max_active_runs=1 일때만 사용 가능 |
@hourly | 매 시 정각에 한번씩 실행한다. | |
@daily | 매일 자정에 1회 실행한다. | |
@weekly | 매주 일요일 자정에 1회 실행한다. | |
@monthly | 매월 1일 정각에 한번 실행한다. | |
@quarterly | 분기별로 1회 첫날 자정에 실행한다. | |
@yearly | 매년 1월 1일 자정에 1회 실행한다. |
프리셋 기반 스케줄링 예제
import pendulum
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
tz = pendulum.timezone("Asia/Seoul") # dag 스케쥴링 기준 timezone을 설정합니다.
default_args = {
'owner': 'kyeongseo.oh', # dag의 관리자를 명시합니다.
'retries': 1, # 실패 시 1회 재시작 합니다.
'retry_delay': timedelta(minutes=5), # 재시작 간격을 5분으로 설저합니다.
}
dag = DAG(
dag_id="chapter_1", # dag id를 설정합니다.
schedule="@continuous", # 이전 실행이 끝나자마자 바로 다시 실행한다.
start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz), # dag가 처음으로 스케쥴되는 시간을 설정합니다.
tags=["kyeongseo.oh", "continuous"], # dag의 tag를 생성합니다.
catchup=False, # backfill을 하지 않도록 설정합니다.
max_active_runs = 1 # @continuous을 사용할 때는 chapter_1 dag가 동시에 여러개 실행되지 않도록 1로 설정해야합니다.
)
def hello_world():
print("hello world")
bash1 = BashOperator(bash_command="echo 'hello task 1'", task_id="hello_bash1", dag=dag)
bash2 = BashOperator(bash_command="echo 'hello task 2'", task_id="hello_bash2", dag=dag)
py = PythonOperator(python_callable=hello_world, task_id="py_hello", dag=dag)
[bash1, bash2] >> py
2. cron 기반 스케줄링
일반적으로 사용하는 cron 기반 스케줄링과 동일하다.
cron에서 각 요소가 허용하는 값은 아래와 같다.
분 | 시간 | 일 | 월 | 요일 |
0-59 | 0-23 | 1-31 | 1-12 또는 JAN-DEC | 0-6 또는 SUN-SAT |
cron에서 허용하는 특수문자와 그 의미는 아래와 같다.
* | 모든 값 | * * * * * (매분) |
, | 값 목록 | 1,15 * * * * (매시간 1분과 15분) |
- | 범위 | 0-5 * * * * (매시간 정각부터 5분까지 매분 실행) |
/ | 단계 값 | */15 * * * * (15분마다) |
cron example
0 0 * * * | 매일 자정 |
*/5 * * * * | 5분마다 |
0 20 * * 1-5 | 평일 오후 8시 |
0 0 1,15 * * | 매월 1일과 15일 자정 |
0 9-17 * * * | 매일 오전 9시부터 오후 5시까지 매시간 |
cron 기반 스케줄링 예제
import pendulum
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
tz = pendulum.timezone("Asia/Seoul") # dag 스케쥴링 기준 timezone을 설정합니다.
default_args = {
'owner': 'kyeongseo.oh', # dag의 관리자를 명시합니다.
'retries': 1, # 실패 시 1회 재시작 합니다.
'retry_delay': timedelta(minutes=5), # 재시작 간격을 5분으로 설저합니다.
}
dag = DAG(
dag_id="chapter_1", # dag id를 설정합니다.
schedule="*/5 * * * *", # 5분에 한번씩 실행되도록 설정
start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz), # dag가 처음으로 스케쥴되는 시간을 설정합니다.
tags=["kyeongseo.oh", "every_5_minutes"], # dag의 tag를 생성합니다.
catchup=False, # backfill을 하지 않도록 설정합니다.
)
def hello_world():
print("hello world")
bash1 = BashOperator(bash_command="echo 'hello task 1'", task_id="hello_bash1", dag=dag)
bash2 = BashOperator(bash_command="echo 'hello task 2'", task_id="hello_bash2", dag=dag)
py = PythonOperator(python_callable=hello_world, task_id="py_hello", dag=dag)
[bash1, bash2] >> py
3. 빈도 기반 스케줄링
python의 timedelta를 사용해 빈도 기반으로 스케줄링 할 수 있다.
5분마다 | timedelta(minutes=5) | DAG가 5분 간격으로 실행됩니다. |
매시간 | timedelta(hours=1) | DAG가 1시간 간격으로 실행됩니다. |
6시간마다 | timedelta(hours=6) | DAG가 6시간 간격으로 실행됩니다. |
3일마다 | timedelta(days=3) | DAG가 3일 간격으로 실행됩니다. |
매주 | timedelta(weeks=1) | DAG가 1주일 간격으로 실행됩니다. |
빈도 기반 스케줄링 예제
import pendulum
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
tz = pendulum.timezone("Asia/Seoul") # dag 스케쥴링 기준 timezone을 설정합니다.
default_args = {
'owner': 'kyeongseo.oh', # dag의 관리자를 명시합니다.
'retries': 1, # 실패 시 1회 재시작 합니다.
'retry_delay': timedelta(minutes=5), # 재시작 간격을 5분으로 설저합니다.
}
dag = DAG(
dag_id="chapter_1", # dag id를 설정합니다.
schedule=timedelta(minutes=1), # 1분에 한번씩 실행되도록 설정
start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz), # dag가 처음으로 스케쥴되는 시간을 설정합니다.
tags=["kyeongseo.oh", "every_5_minutes"], # dag의 tag를 생성합니다.
catchup=False, # backfill을 하지 않도록 설정합니다.
)
def hello_world():
print("hello world")
bash1 = BashOperator(bash_command="echo 'hello task 1'", task_id="hello_bash1", dag=dag)
bash2 = BashOperator(bash_command="echo 'hello task 2'", task_id="hello_bash2", dag=dag)
py = PythonOperator(python_callable=hello_world, task_id="py_hello", dag=dag)
[bash1, bash2] >> py
'aiflow' 카테고리의 다른 글
airflow connection과 hook (0) | 2024.08.10 |
---|---|
airflow task context: jinja template, get_current_context (0) | 2024.08.10 |
airflow catchup & backfill (0) | 2024.08.10 |
airflow 실행 날짜 이해: DAG 실행 시간과 execution_date 비교 (0) | 2024.08.09 |
airflow dag의 구조 (0) | 2024.06.22 |
댓글