catchup은 dag가 활성화될 때 자동으로 과거의 미실행 간격을 실행하는 기능이다.
catchup을 True로 설정한 경우 dag를 일시적으로 비활성화했다가 다시 활성화하면 start_date 이후로 그 동안 실행되지 않았던 dag들이 자동으로 실행된다.
backfill은 특정 기간의 과거 실행을 수동으로 트리거하는 기능으로 start_date 이전의 데이터를 처리하기 원할 때 사용한다.
backfill은 cli를 통해 수행된다.
catchup=True example
start_date와 현재 시간 사이에서 실행되지 않았던 dag들이 한번에 모두 실행된다.
airflow의 설정값인 max_active_runs_per_dag(default 16) 값만큼 동시에 실행되기에 서버 부하가 발생할 수 있다.
이를 방지하기 위해 max_dag_runs 설정을 추가해 동시에 실행되는 dag_run을 제어해야 한다.
import pendulum
from airflow.decorators import dag, task
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.python import get_current_context
@dag(
schedule=timedelta(minutes=5),
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
end_date=pendulum.datetime(2024, 8, 10, 4, tz=pendulum.timezone("Asia/Seoul")),
catchup=True,
max_active_runs=2, # 한번에 실행되는 dag의 수를 2개로 제한
tags=["kyeongseo.oh", "catchup_true"],
)
def catch_up_true():
@task()
def print_context():
context = get_current_context()
for k, v in context.items():
print(f"{k} : {v}")
print_context()
catch_up_true()
catchup=False example
catchup을 비활성화하고 dag를 2024-08-10 03:14:00에 트리거한 경우를 예로 들어 설명하면, 03:10에 실행되었어야 하는 dag가 03:14에 실행되고, 그 이후로는 기존 스케줄 간격대로 dag가 실행된다. 이때 03:14에 실행된 dag는 03:05에 예약된 dag로 03:10에 실행되었어야 하는 dag이다.
import pendulum
from airflow.decorators import dag, task
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.python import get_current_context
@dag(
schedule=timedelta(minutes=5),
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
end_date=pendulum.datetime(2024, 8, 10, 4, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["kyeongseo.oh", "catchup_false"],
)
def catch_up_false():
@task()
def print_context():
context = get_current_context()
for k, v in context.items():
print(f"{k} : {v}")
print_context()
catch_up_false()
backfill example
Celery Executor를 사용하는 경우 redis 혹은 rabbitmq가 필요하다.
redis/rabbitmq가 없는 standalone airflow는 LocalExecutor를 사용하여 작업을 실행하기 위해 -l 옵션을 사용해야 한다.
backfill의 경우도 catchup=True와 마찬가지로 max_active_runs_per_dag 값만큼 동시에 dag가 실행되고,
dag에 max_dag_runs가 설정된 경우에는 이 설정을 따른다. backfill cli는 별도로 동시에 실행되는 dag_run을 제어할 방법이 없기에, backfill하는 dag에는 max_dag_runs를 선언해두는 것이 좋다.
airflow dags backfill \
--start-date "2024-08-010T00:00:00+09:00" \
--end-date "2024-08-01T00:20:00+09:00" \
catch_up_false \
-l
'aiflow' 카테고리의 다른 글
airflow connection과 hook (0) | 2024.08.10 |
---|---|
airflow task context: jinja template, get_current_context (0) | 2024.08.10 |
airflow 실행 날짜 이해: DAG 실행 시간과 execution_date 비교 (0) | 2024.08.09 |
airflow 스케줄링 : preset, cron, 빈도 기반 (0) | 2024.08.09 |
airflow dag의 구조 (0) | 2024.06.22 |
댓글