task 의존성은 DAG(Directed Acyclic Graph) 내에서 task 간의 실행 순서와 관계를 정의하는 개념으로 airflow에서는
비트시프트 연산자(>>)를 사용해 task 간의 의존성을 설정할 수 있다.
task 의존성의 패턴으로는 선형 의존성, fan-out, fan-in, 조건부 의존성이 있다.
1. 선형 의존성
import pendulum
from airflow.decorators import dag, task
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["kyeongseo.oh", "task_dependency"],
)
def task_dependency():
@task
def task1():
print("1")
@task
def task2():
print("2")
@task
def task3():
print("3")
task1() >> task2() >> task3()
task_dependency()
2. 팬아웃 (Fan-out)
task1() >> [task2(), task3()]
3. 팬인 (Fan-in):
[task1(), task2()] >> task3()
아래와 같이 선형, 팬인, 팬아웃을 함께 활용한 dag를 설계할 수 도 있다.
from airflow.decorators import dag, task
import pendulum
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["kyeongseo.oh", "task_dependency"],
)
def task_dependency():
@task
def task1():
print("1")
@task
def task2():
print("2")
@task
def task3():
print("3")
@task
def task2_1():
print("2_1")
@task
def task3_1():
print("3_1")
@task
def task4():
print("4")
t1 = task1()
t2, t3 = task2(), task3()
t2_1, t3_1 = task2_1(), task3_1()
t4 = task4()
t1 >> [t2, t3]
t2 >> t2_1
t3 >> t3_1
[t2_1, t3_1] >> t4
task_dependency()
4. 조건부 의존성
특정 조건에 따라 실행해야 하는 task가 다른 경우, 조건부 의존성을 가진 dag를 생성할 수 있다.
BranchPythonOperator를 사용하거나, taskflow의 task.branch 데코레이터를 사용하면 조건부 의존성을 구현할 수 있다.
BranchPythonOperator 사용
조건에 따라 실행할 task_id를 return하면 해당 task가 실행되고, 해당하지 않는 task는 skip된다.
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
import pendulum
import random
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "branching"],
)
def branch_example():
# 조건에 따라 실행할 task의 id를 return하는 함수
def branch_func():
task_id = random.choice(['branch_1', 'branch_2'])
if task_id == "branch_1":
return "branch_1"
else:
return "branch_2"
@task
def start():
return "start"
# branch_func 함수가 return하는 task_id를 실행한다.
branch_op = BranchPythonOperator(
task_id='branch_task',
python_callable=branch_func,
)
@task
def branch_1():
print("branch_1")
@task
def branch_2():
print("branch_1")
@task
def end():
print("end")
start() >> branch_op >> [branch_1(), branch_2()] >> end()
branch_example()
아래와 같이 branch_2가 skipped된 것을 확인할 수 있다. 여기서 문제는 end task 또한 skipped 되었다는 것이다.
이는 airflow가 부모 task가 success일 때만 task가 실행되도록 설계되어 있기 때문으로, trigger rule을 조절하면 이를 해결할 수 있다.
BranchPythonOperator + trigger rule 사용
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
import pendulum
import random
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "branching"],
)
def branch_example():
def branch_func():
task_id = random.choice(['branch_1', 'branch_2'])
if task_id == "branch_1":
return "branch_1"
else:
return "branch_2"
@task
def start():
return "start"
branch_op = BranchPythonOperator(
task_id='branch_task',
python_callable=branch_func,
)
@task
def branch_1():
print("branch_1")
@task
def branch_2():
print("branch_1")
@task(trigger_rule=TriggerRule.ONE_SUCCESS)
def end():
print("end")
start() >> branch_op >> [branch_1(), branch_2()] >> end()
branch_example()
trigger rule을 one_success로 수정해 end task가 정상적으로 성공했다.
trigger rule에 대한 자세한 내용은 다음에 설명한다.
taskflow 데코레이터 사용
@task.branch decorator를 사용해 조건부 의존성을 설정한다. 위와 동일한 dag graph가 생성된다.
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
import pendulum
import random
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "branching"],
)
def branch_example():
def branch_func():
task_id = random.choice(['branch_1', 'branch_2'])
if task_id == "branch_1":
return "branch_1"
else:
return "branch_2"
@task
def start():
return "start"
@task.branch
def branch_op():
return branch_func()
@task
def branch_1():
print("branch_1")
@task
def branch_2():
print("branch_1")
@task(trigger_rule=TriggerRule.ONE_SUCCESS)
def end():
print("end")
start() >> branch_op() >> [branch_1(), branch_2()] >> end()
branch_example()
'aiflow' 카테고리의 다른 글
airflow Xcom (0) | 2024.08.11 |
---|---|
airflow trigger rule (1) | 2024.08.11 |
airflow connection과 hook (0) | 2024.08.10 |
airflow task context: jinja template, get_current_context (0) | 2024.08.10 |
airflow catchup & backfill (0) | 2024.08.10 |
댓글