본문 바로가기
aiflow

airflow Dynamic Task Mapping

by kyeongseo.oh 2024. 8. 20.

1. Dynamic Task Mapping 개요

  • 런타임에 task의 수와 argument를 동적으로 결정할 수 있다.
  • 병렬 처리를 통해 처리 시간을 단축할 수 있다.
  • 과도한 병렬 처리로 시스템 부하를 유발할 수 있으므로, 적절한 max_active_tasks 설정이 필요하다.
  • 각 task에 다른 argument를 전달할 수 있다.
  • mapreduce의 map과 비슷한 개념이라고 볼 수 있다.

 

2. 주요 개념

2-1. expand

입력받은 argument를 각 task에 동적으로 매핑해 여러 task instance를 생성한다.

아래는 expand를 사용한 간단한 예제이다.

@task
def return_x(x):
    return x

x_values = return_x.expand(x=[1, 2, 3])

 

 

위의 task는 아래와 같이 동적으로 확장되어, 3개의 개별 task instance가 실행된다.

return_x(x=1)
return_x(x=2)
return_x(x=3)

 

 

2-2. partial

입력받은 argument를 모든 task에 동일하게 적용한다.

아래는 partial을 사용한 간단한 예제이다.

@task
def return_x_y(x, y):
    return x, y

x_y_values = return_x_y.partial(y=5).expand(x=[1, 2, 3])

 

위의 예제는 partial(y=5)에서 y를 5로 고정하고, expand를 통해 x에 대해 동적 매핑을 수행해 아래와 같은 3개의 개별 task instance가 실행된다.

return_x_y(x=1, y=5)
return_x_y(x=2, y=5)
return_x_y(x=3, y=5)

 

2-3. LazySelectSequence

spark의 RDD와 유사한 lazy evaluation 개념으로 요청시에만 개별 값을 로드해 메모리 활용을 최적화한다.

for loop, indexing을 통해 개별 값에 접근할 수 있고, list 함수를 사용해 list type으로 변환할 수 있다.

하지만 list type으로 변환 시 LazySelectSequence의 모든 항목이 즉시 메모리에 로드되므로, 사용에 유의해야 한다.

 

3. 사용 예제

아래는 expand()를 사용해 9개의 피자 조합(3가지 토핑 * 3가지 크기)을 만들고, partial().expand()를 사용해 도우 타입을 씬으로 고정한 9개의 피자 조합을 만드는 예제로 package_pizzas task에서 집계 처리된다. max_active_tasks=3 설정으로 동시 실행 태스크 수는 3개로 제한된다.

import pendulum
from airflow.decorators import dag, task
from airflow.datasets import Dataset

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    max_active_tasks=3,
    tags=["example", "Dynamic Task Mapping"],
)
def pizza_making_dag():

    @task
    def make_pizza(topping, size, dough="일반"):
        return f"{size} {dough} {topping} 피자 완성"

    
    # expand()만 사용
    pizzas_expand = make_pizza.expand(
        topping=["페퍼로니", "치즈", "하와이안"],
        size=["스몰", "미디움", "라지"]
    )
    
    # partial()과 함께 사용
    pizzas_partial = make_pizza.partial(dough="씬").expand(
        topping=["페퍼로니", "치즈", "하와이안"],
        size=["스몰", "미디움", "라지"]
    )

    @task
    def package_pizzas(pizzas):
        for pizza in pizzas:
            print(f"포장 완료: {pizza}")

    package_pizzas(pizzas_expand)
    package_pizzas(pizzas_partial)

pizza_making_dag()

 

  • 생성된 동적 task의 수만큼 taskName[n]으로 표시된다.

 

 

  • Mapped Tasks tab에서 각 동적 task의 실행 결과를 로그를 확인할 수 있다.

 

  • expand()를 사용한 package_pizzas의 로그
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 일반 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 일반 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 일반 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 일반 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 일반 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 일반 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 일반 하와이안 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 일반 하와이안 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 일반 하와이안 피자 완성

 

  • partial().expand()를 사용한 package_pizzas의 로그
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 씬 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 씬 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 씬 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 씬 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 씬 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 씬 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 씬 하와이안 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 씬 하와이안 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 씬 하와이안 피자 완성

댓글