본문 바로가기
aiflow

airflow task 의존성

by kyeongseo.oh 2024. 8. 11.

task 의존성은 DAG(Directed Acyclic Graph) 내에서 task 간의 실행 순서와 관계를 정의하는 개념으로 airflow에서는

비트시프트 연산자(>>)를 사용해 task 간의 의존성을 설정할 수 있다.

task 의존성의 패턴으로는 선형 의존성, fan-out, fan-in, 조건부 의존성이 있다.

 

1. 선형 의존성

import pendulum
from airflow.decorators import dag, task

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["kyeongseo.oh", "task_dependency"],
)

def task_dependency():

    @task
    def task1():
        print("1")
    @task
    def task2():
        print("2")
    @task
    def task3():
        print("3")

    task1() >> task2() >> task3()

task_dependency()

 

 

2. 팬아웃 (Fan-out)

task1() >> [task2(), task3()]

 

 

3. 팬인 (Fan-in):

[task1(), task2()] >> task3()
 

 

 

아래와 같이 선형, 팬인, 팬아웃을 함께 활용한 dag를 설계할 수 도 있다.

from airflow.decorators import dag, task
import pendulum

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["kyeongseo.oh", "task_dependency"],
)
def task_dependency():

    @task
    def task1():
        print("1")

    @task
    def task2():
        print("2")

    @task
    def task3():
        print("3")

    @task
    def task2_1():
        print("2_1")

    @task
    def task3_1():
        print("3_1")
    
    @task
    def task4():
        print("4")

    t1 = task1()
    t2, t3 = task2(), task3()
    t2_1, t3_1 = task2_1(), task3_1()
    t4 = task4()

    t1 >> [t2, t3]
    t2 >> t2_1
    t3 >> t3_1
    [t2_1, t3_1] >> t4

task_dependency()
 
 

4. 조건부 의존성

특정 조건에 따라 실행해야 하는 task가 다른 경우, 조건부 의존성을 가진 dag를 생성할 수 있다.

BranchPythonOperator를 사용하거나,  taskflow의 task.branch 데코레이터를 사용하면 조건부 의존성을 구현할 수 있다.

 

BranchPythonOperator 사용

조건에 따라 실행할 task_id를 return하면 해당 task가 실행되고, 해당하지 않는 task는 skip된다.

from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
import pendulum
import random

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "branching"],
)
def branch_example():

    # 조건에 따라 실행할 task의 id를 return하는 함수
    def branch_func():
        task_id = random.choice(['branch_1', 'branch_2'])

        if task_id == "branch_1":
            return "branch_1"
        else:
            return "branch_2"

    @task
    def start():
        return "start"

	# branch_func 함수가 return하는 task_id를 실행한다.
    branch_op = BranchPythonOperator(
        task_id='branch_task',
        python_callable=branch_func,
    )

    @task
    def branch_1():
        print("branch_1")

    @task
    def branch_2():
        print("branch_1")

    @task
    def end():
        print("end")

    start() >> branch_op >> [branch_1(), branch_2()] >> end()

branch_example()

 

아래와 같이 branch_2가 skipped된 것을 확인할 수 있다. 여기서 문제는 end task 또한 skipped 되었다는 것이다.

이는 airflow가 부모 task가 success일 때만 task가 실행되도록 설계되어 있기 때문으로, trigger rule을 조절하면 이를 해결할 수 있다.

BranchPythonOperator + trigger rule 사용

from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
import pendulum
import random

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "branching"],
)
def branch_example():

    def branch_func():
        task_id = random.choice(['branch_1', 'branch_2'])

        if task_id == "branch_1":
            return "branch_1"
        else:
            return "branch_2"

    @task
    def start():
        return "start"

    branch_op = BranchPythonOperator(
        task_id='branch_task',
        python_callable=branch_func,
    )

    @task
    def branch_1():
        print("branch_1")

    @task
    def branch_2():
        print("branch_1")

    @task(trigger_rule=TriggerRule.ONE_SUCCESS)
    def end():
        print("end")

    start() >> branch_op >> [branch_1(), branch_2()] >> end()

branch_example()

 

trigger rule을 one_success로 수정해 end task가 정상적으로 성공했다.

trigger rule에 대한 자세한 내용은 다음에 설명한다.

 

taskflow 데코레이터 사용

@task.branch decorator를 사용해 조건부 의존성을 설정한다. 위와 동일한 dag graph가 생성된다.

from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
import pendulum
import random

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "branching"],
)
def branch_example():

    def branch_func():
        task_id = random.choice(['branch_1', 'branch_2'])

        if task_id == "branch_1":
            return "branch_1"
        else:
            return "branch_2"

    @task
    def start():
        return "start"

    @task.branch
    def branch_op():
        return branch_func()

    @task
    def branch_1():
        print("branch_1")

    @task
    def branch_2():
        print("branch_1")

    @task(trigger_rule=TriggerRule.ONE_SUCCESS)
    def end():
        print("end")

    start() >> branch_op() >> [branch_1(), branch_2()] >> end()

branch_example()

'aiflow' 카테고리의 다른 글

airflow Xcom  (0) 2024.08.11
airflow trigger rule  (1) 2024.08.11
airflow connection과 hook  (0) 2024.08.10
airflow task context: jinja template, get_current_context  (0) 2024.08.10
airflow catchup & backfill  (0) 2024.08.10

댓글