본문 바로가기
aiflow

airflow ExternalTaskSensor와 TriggerDagRunOperator

by kyeongseo.oh 2024. 8. 17.

ExternalTaskSensor

DAG 간 의존성이 존재할 경우에 사용되는 sensor로 다른 DAG의 특정 태스크가 완료되었는 지를 확인하는 역할을 수행한다.

ExternalTaskSensor은 Logical Date를 기준으로 upstream DAG의 수행을 판단한다. 스케줄링된 시간이 다르거나, 수동으로 DAG를 트리거 한 경우에는 Sensor가 제대로 동작하지 않게 된다.

 

아래와 같이 DAG의 Logical Date가 동일한 경우에만 ExternalTaskSensor는 upstream DAG가 수행된 것으로 판단한다는 점에 주의하자.

dag1

 

external_task_sensor

 

ExternalTaskSensor 예제 코드

import pendulum
from airflow.decorators import dag, task
from airflow.sensors.external_task import ExternalTaskSensor

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "sensor"],
)
def external_task_sensor():

    external_sensor = ExternalTaskSensor(task_id='wait_for_dag1',
                                        external_dag_id='dag1',
                                        external_task_id='task1',
                                        mode='reschedule',
                                        poke_interval=10,
                                        timeout=60,
                                        )

    external_sensor
    
external_task_sensor()

 

아래 그림은 ExternalTaskSensor와 Logical Date의 관계를 표현한 그림이다.

 

그렇다면 각 DAG의 스케줄이 달라 Logical Date이 다른 경우에는 어떻게 해야 할까?

이때는 execution_delta를 사용해 Logical Date의 간격을 지정할 수 있다.

아래 예제에서는 execution_delta=timedelta(hours=1)를 적용해 1시간 전에 dag1이 수행되었는 지 확인한다.

dag1
external_task_sensor

 

execution_delta 적용 예제

import pendulum
from airflow.decorators import dag, task
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import timedelta

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "sensor"],
)
def external_task_sensor():

    external_sensor = ExternalTaskSensor(task_id='wait_for_dag1',
                                        external_dag_id='dag1',
                                        external_task_id='task1',
                                        mode='reschedule',
                                        poke_interval=10,
                                        timeout=60,
                                        execution_delta=timedelta(hours=1)
                                        )

    external_sensor

external_task_sensor()

 

아래 그림은 ExternalTaskSensor에 execution_delta를 적용했을 때를 표현한 그림이다.

TriggerDagRunOperator

DAG 간 의존성이 존재할 경우에 사용되는 operator로 다른 DAG를 트리거한다.

트리거된 DAG는 원래 스케줄과 별개로 실행된다.

 

trigger되는 DAG는 아래와 같이 스케줄링이 활성화 되어 있어야 한다. 그렇지 않으면 task가 실행되지 않는다.

 

아래는 TriggerDagRunOperator를 사용해 dag1를 실행시키는 예제이다. 사용 방법은 매우 간단하다.

trigger_dag_id에 트리거할 dag의 id만 입력해주면 된다.

 

아래 코드에서는 wait_for_completion을 True로 설정해 dag1이 종료되어야 trigger_dag1 task가 success 되도록 하여, dag1_finish_notify가 dag1 수행 종료 후에 수행되도록 설정했다. poke_interval은 dag1이 완료되었는 지를 확인하는 시간 간격이다.

import pendulum
from airflow.decorators import dag, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import timedelta

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "trigger"],
)
def trigger_dag():

    trigger_dag1 = TriggerDagRunOperator(
        task_id="trigger_dag1",
        trigger_dag_id="dag1",
        wait_for_completion=True,
        poke_interval=10,
    )

    @task
    def dag1_finish_notify():
        print("dag1 finished")

    trigger_dag1 >> dag1_finish_notify()

trigger_dag()


@dag(
    schedule=None, # trigger되는 DAG는 schedule이 필요없다.
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "trigger"],
)
def dag1():

    @task
    def task1():
        print("task1")
    
    task1()

dag1()

 

wait_for_completion이 True인 경우에는 아래와 같이 동작합니다.

 

wait_for_completion이 True인 경우에는 아래와 같이 동작합니다.

 

결론

DAG 1이 수행된 후에 DAG 2 와 DAG 3이 수행되어야 하는 경우에는 TriggerDagRunOperator 사용이 적절하고,

DAG 1과 DAG 2가 수행된 후에 DAG 3이 수행되어야 하는 경우에는 ExternalTaskSensor 사용이 적절하다.

댓글