1. Dynamic Task Mapping 개요
- 런타임에 task의 수와 argument를 동적으로 결정할 수 있다.
- 병렬 처리를 통해 처리 시간을 단축할 수 있다.
- 과도한 병렬 처리로 시스템 부하를 유발할 수 있으므로, 적절한 max_active_tasks 설정이 필요하다.
- 각 task에 다른 argument를 전달할 수 있다.
- mapreduce의 map과 비슷한 개념이라고 볼 수 있다.
2. 주요 개념
2-1. expand
입력받은 argument를 각 task에 동적으로 매핑해 여러 task instance를 생성한다.
아래는 expand를 사용한 간단한 예제이다.
@task
def return_x(x):
return x
x_values = return_x.expand(x=[1, 2, 3])
위의 task는 아래와 같이 동적으로 확장되어, 3개의 개별 task instance가 실행된다.
return_x(x=1)
return_x(x=2)
return_x(x=3)
2-2. partial
입력받은 argument를 모든 task에 동일하게 적용한다.
아래는 partial을 사용한 간단한 예제이다.
@task
def return_x_y(x, y):
return x, y
x_y_values = return_x_y.partial(y=5).expand(x=[1, 2, 3])
위의 예제는 partial(y=5)에서 y를 5로 고정하고, expand를 통해 x에 대해 동적 매핑을 수행해 아래와 같은 3개의 개별 task instance가 실행된다.
return_x_y(x=1, y=5)
return_x_y(x=2, y=5)
return_x_y(x=3, y=5)
2-3. LazySelectSequence
spark의 RDD와 유사한 lazy evaluation 개념으로 요청시에만 개별 값을 로드해 메모리 활용을 최적화한다.
for loop, indexing을 통해 개별 값에 접근할 수 있고, list 함수를 사용해 list type으로 변환할 수 있다.
하지만 list type으로 변환 시 LazySelectSequence의 모든 항목이 즉시 메모리에 로드되므로, 사용에 유의해야 한다.
3. 사용 예제
아래는 expand()를 사용해 9개의 피자 조합(3가지 토핑 * 3가지 크기)을 만들고, partial().expand()를 사용해 도우 타입을 씬으로 고정한 9개의 피자 조합을 만드는 예제로 package_pizzas task에서 집계 처리된다. max_active_tasks=3 설정으로 동시 실행 태스크 수는 3개로 제한된다.
import pendulum
from airflow.decorators import dag, task
from airflow.datasets import Dataset
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
max_active_tasks=3,
tags=["example", "Dynamic Task Mapping"],
)
def pizza_making_dag():
@task
def make_pizza(topping, size, dough="일반"):
return f"{size} {dough} {topping} 피자 완성"
# expand()만 사용
pizzas_expand = make_pizza.expand(
topping=["페퍼로니", "치즈", "하와이안"],
size=["스몰", "미디움", "라지"]
)
# partial()과 함께 사용
pizzas_partial = make_pizza.partial(dough="씬").expand(
topping=["페퍼로니", "치즈", "하와이안"],
size=["스몰", "미디움", "라지"]
)
@task
def package_pizzas(pizzas):
for pizza in pizzas:
print(f"포장 완료: {pizza}")
package_pizzas(pizzas_expand)
package_pizzas(pizzas_partial)
pizza_making_dag()
- 생성된 동적 task의 수만큼 taskName[n]으로 표시된다.
- Mapped Tasks tab에서 각 동적 task의 실행 결과를 로그를 확인할 수 있다.
- expand()를 사용한 package_pizzas의 로그
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 일반 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 일반 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 일반 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 일반 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 일반 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 일반 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 일반 하와이안 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 일반 하와이안 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 일반 하와이안 피자 완성
- partial().expand()를 사용한 package_pizzas의 로그
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 씬 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 씬 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 씬 페퍼로니 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 씬 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 씬 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 씬 치즈 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 스몰 씬 하와이안 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 미디움 씬 하와이안 피자 완성
[2024-08-31, 22:27:33 UTC] {logging_mixin.py:190} INFO - 포장 완료: 라지 씬 하와이안 피자 완성
'aiflow' 카테고리의 다른 글
airflow에서 remote spark cluster에 job submit 하기: SparkSubmitOperator (0) | 2024.09.03 |
---|---|
airflow Variable (0) | 2024.09.02 |
airflow Data-aware scheduling과 Dataset (0) | 2024.08.17 |
airflow Params (0) | 2024.08.17 |
airflow 외부 시스템 이용하기 -2: ImpalaHook, S3Hook (0) | 2024.08.17 |
댓글