본문 바로가기
aiflow

airflow catchup & backfill

by kyeongseo.oh 2024. 8. 10.

catchup은 dag가 활성화될 때 자동으로 과거의 미실행 간격을 실행하는 기능이다.

catchup을 True로 설정한 경우 dag를 일시적으로 비활성화했다가 다시 활성화하면 start_date 이후로 그 동안 실행되지 않았던 dag들이 자동으로 실행된다. 

 

backfill은 특정 기간의 과거 실행을 수동으로 트리거하는 기능으로 start_date 이전의 데이터를 처리하기 원할 때 사용한다.

backfill은 cli를 통해 수행된다.

 

catchup=True example

start_date와 현재 시간 사이에서 실행되지 않았던 dag들이 한번에 모두 실행된다.

airflow의 설정값인 max_active_runs_per_dag(default 16) 값만큼 동시에 실행되기에 서버 부하가 발생할 수 있다.

이를 방지하기 위해 max_dag_runs 설정을 추가해 동시에 실행되는 dag_run을 제어해야 한다.

import pendulum
from airflow.decorators import dag, task
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.python import get_current_context

@dag(
    schedule=timedelta(minutes=5),
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    end_date=pendulum.datetime(2024, 8, 10, 4, tz=pendulum.timezone("Asia/Seoul")),
    catchup=True,
    max_active_runs=2, # 한번에 실행되는 dag의 수를 2개로 제한
    tags=["kyeongseo.oh", "catchup_true"],
)

def catch_up_true():

    @task()
    def print_context():
        context = get_current_context()

        for k, v in context.items():
            print(f"{k} : {v}")
        
    print_context()

catch_up_true()

 

 

catchup=False example

catchup을 비활성화하고 dag를 2024-08-10 03:14:00에 트리거한 경우를 예로 들어 설명하면, 03:10에 실행되었어야 하는 dag가 03:14에 실행되고, 그 이후로는 기존 스케줄 간격대로 dag가 실행된다. 이때 03:14에 실행된 dag는 03:05에 예약된 dag로 03:10에 실행되었어야 하는 dag이다.

 

import pendulum
from airflow.decorators import dag, task
from datetime import timedelta
import time
from airflow import DAG
from airflow.operators.python import get_current_context

@dag(
    schedule=timedelta(minutes=5),
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    end_date=pendulum.datetime(2024, 8, 10, 4, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["kyeongseo.oh", "catchup_false"],
)

def catch_up_false():

    @task()
    def print_context():
        context = get_current_context()

        for k, v in context.items():
            print(f"{k} : {v}")
        
    print_context()

catch_up_false()

 

 

backfill example

Celery Executor를 사용하는 경우 redis 혹은 rabbitmq가 필요하다.

redis/rabbitmq가 없는 standalone airflow는 LocalExecutor를 사용하여 작업을 실행하기 위해 -l 옵션을 사용해야 한다.

backfill의 경우도 catchup=True와 마찬가지로 max_active_runs_per_dag 값만큼 동시에 dag가 실행되고,

dag에 max_dag_runs가 설정된 경우에는 이 설정을 따른다. backfill cli는 별도로 동시에 실행되는 dag_run을 제어할 방법이 없기에, backfill하는 dag에는  max_dag_runs를 선언해두는 것이 좋다.

airflow dags backfill \
    --start-date "2024-08-010T00:00:00+09:00" \
    --end-date "2024-08-01T00:20:00+09:00" \
    catch_up_false \
    -l

 

댓글