본문 바로가기
aiflow

airflow trigger rule

by kyeongseo.oh 2024. 8. 11.

Trigger Rule은 선행 태스크들의 상태에 따라 현재 태스크의 실행 여부를 결정하는 규칙이다.

기본적으로 Airflow는 모든 직접적인 상위 태스크가 성공적으로 완료되어야 다음 태스크를 실행한다.

하지만 Trigger Rule을 사용하면 이 기본 동작을 변경할 수 있어 좀 더 복잡한 dag를 설계할 수 있다.

 

Airflow에서 제공하는 Trigger Rule의 종류는 아래와 같다. 아래에서 말하는 상위 task는 현재 task와 직접적으로 연결된 이전 태스크를 의미한다.

Trigger Rule Description
all_success 상위 task가 모두 성공하면 실행
all_failed 상위 task가 모두 실패하면 실행
all_done 상위 task가 모두 완료되면 실행 (성공 / 실패 여부 무관)
all_skipped 상위 task가 모두 skipped 상태면 실행
one_failed 하나 이상의 상위 task가 실패하면 바로 실행
one_success 하나 이상의 상위 task가 성공하면 바로 실행
one_done 상위 task 중 하나 이상 완료되면 바로 실행
none_failed 상위 task 중 실패가 없으면 실행
none_failed_min_one_success 상위 task 중 실패가 없고, 성공한 task가 하나 이상이면 실행
none_skipped 상위 task중 skipped가 없으면 실행 (성공 / 실패 여부 무관)
always 항상 실행

 

 

1. all_sucess

Trigger rule의 default 값은 all_success이다. 자동으로 all_success가 적용된다.

 

from airflow.decorators import dag, task
import pendulum

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "trigger_rule"],
)
def trigger_rule():

    @task
    def task1():
        raise

    @task
    def task2():
        print("task2")

    @task
    def task3():
        print("3")

    [task1(), task2()] >> task3()

trigger_rule()

 

 

2. all_done

trigger_rule은 task별로 적용이 가능하다. task3에는 ALL_DONE을 task4는 ONE_FAILED를 적용했다.

task3의 부모 task는 success, failed로 task가 모두 종료되어 task3가 정상적으로 실행되었고,

task4의 부모 task인 task3은 success로 task4의 trigger_rule인 ONE_FAILED에 충족하지 않아 task4는 skipped 상태가 되었다.

from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule
import pendulum

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "trigger_rule"],
)
def trigger_rule():

    @task
    def task1():
        raise

    @task
    def task2():
        print("task2")

    @task(trigger_rule=TriggerRule.ALL_DONE)
    def task3():
        print("3")

    @task(trigger_rule=TriggerRule.ONE_FAILED)
    def task4():
        print("4")

    [task1(), task2()] >> task3() >> task4()

trigger_rule()

'aiflow' 카테고리의 다른 글

airflow Taskflow api: @dag, @task  (0) 2024.08.11
airflow Xcom  (0) 2024.08.11
airflow task 의존성  (0) 2024.08.11
airflow connection과 hook  (0) 2024.08.10
airflow task context: jinja template, get_current_context  (0) 2024.08.10

댓글