1. Airflow Params 개요
- DAG와 태스크에 동적으로 값을 전달하는 방법이다.
- 런타임에 추가 Params를 제공하거나 Params 값을 덮어쓸 수 있어 DAG의 유연성이 높아진다.
- 스케줄링된 DAG는 Params에 설정된 기본값을 사용한다.
- DAG를 수동으로 시작하면 Params를 수정해 DAG를 실행할 수 있다.
- 동일한 DAG를 다른 파라미터로 여러 번 실행할 수 있어 DAG의 재사용성이 높아진다.
2. Params를 사용 사례
- 특정 날짜의 데이터를 재처리 하는 CASE가 있는 경우, 날짜를 Params로 지정한다.
- 개발 / 테스트 / 운영 환경에 따라 다른 database를 사용하는 경우, 연결 정보를 Params로 지정한다.
- 파라미터를 변경하며, 성능 테스트를 진행하는 경우, 파라미터를 Params로 지정한다.
- 메일링 리스트에 인원을 추가해야하는 경우, 이메일 주소를 Params로 지정한다.
3. Params 정의 방법
Airflow DAG에서 Params를 정의하는 방법은 다음과 같다.
3-1. 기본 구조
- Params는 DAG 데코레이터의
params
인자에 딕셔너리 형태로 정의한다. - 각 파라미터는 키-값 쌍으로, 값은
Param
객체로 설정하는 방식이다.
3-2. Param
객체
airflow.models.param.Param
클래스를 사용하여 생성하는 객체이다.- 파라미터의 타입, 기본값, 제약 조건 등을 지정할 수 있다.
3-3. 파라미터 정의 예시
import pendulum
from airflow.decorators import dag, task
from airflow.models.param import Param
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
params= {
"p1" : Param(default=5, type="integer", minimum=3, maximum=10),
"p2" : Param(5, type=["null", "number", "string"]),
"p3": Param("foo", enum=["foo", "bar", 5]),
"p4" : Param(f"{pendulum.now().date()}", type="string", format="date"),
"p5" : Param({"key": "value"}, type=["object", "null"])
},
tags=["example", "Param"],
)
- 정수형 파라미터 (p1):
- 기본값이 5인 정수형 파라미터이다.
- 최소값 3, 최대값 10의 범위 제한이 있는 파라미터이다.
- 다중 타입 파라미터 (p2):
- 기본값이 5이며, null, 숫자, 문자열 타입을 모두 허용하는 파라미터이다.
- 열거형 파라미터 (p3):
- 기본값이 "foo"이며, "foo", "bar", 5 중 하나의 값만 선택 가능한 파라미터이다.
- 날짜 형식 파라미터 (p4):
- 현재 날짜를 기본값으로 가지는 문자열 타입 파라미터이다.
- 날짜 형식으로 지정된 파라미터이다.
- 객체 또는 null 파라미터 (p5):
- 기본값으로 딕셔너리를 가지는 파라미터이다.
- 객체(딕셔너리) 또는 null 값을 허용하는 파라미터이다.
4. Params 사용 방법
Jinja 템플릿을 통해 Params의 값을 추출해 사용할 수 있다. {{ params.parameter_name }} 와 같은 형태로 사용한다.
import pendulum
from airflow.decorators import dag, task
from airflow.models.param import Param
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
params= {
"p1" : Param(default=5, type="integer", minimum=3, maximum=10),
"p2" : Param(5, type=["null", "number", "string"]),
"p3": Param("foo", enum=["foo", "bar", 5]),
"p4" : Param(f"{pendulum.now().date()}", type="string", format="date"),
"p5" : Param({"key": "value"}, type=["object", "null"])
},
tags=["example", "Param"],
)
def param_test():
@task
def print_param(**kwargs):
params = kwargs["params"]
print(params["p1"])
print(params["p2"])
print(params["p3"])
print(params["p4"])
print(params["p5"])
print_param()
param_test()
5. DAG에 값 전달하기
Trigger DAG 버튼을 클릭해 수동으로 DAG를 실행하면 아래와 같이 Params 값을 설정할 수 있는 UI가 표시되고, 값을 설정하면 해당 값이 DAG에 적용된다.
사용자가 제공한 값이 검증을 통과하지 못하면 아래와 같이 경고가 표시된다.
'aiflow' 카테고리의 다른 글
airflow Dynamic Task Mapping (0) | 2024.08.20 |
---|---|
airflow Data-aware scheduling과 Dataset (0) | 2024.08.17 |
airflow 외부 시스템 이용하기 -2: ImpalaHook, S3Hook (0) | 2024.08.17 |
airflow 외부 시스템 이용하기 -1 : DockerOperator, KubernetesPodOperator, SparkKubernetesOperator(SKO) (0) | 2024.08.17 |
airflow ExternalTaskSensor와 TriggerDagRunOperator (0) | 2024.08.17 |
댓글