본문 바로가기
aiflow

airflow Params

by kyeongseo.oh 2024. 8. 17.

1. Airflow Params 개요

  • DAG와 태스크에 동적으로 값을 전달하는 방법이다.
  • 런타임에 추가 Params를 제공하거나 Params 값을 덮어쓸 수 있어 DAG의 유연성이 높아진다.
  • 스케줄링된 DAG는 Params에 설정된 기본값을 사용한다.
  • DAG를 수동으로 시작하면 Params를 수정해 DAG를 실행할 수 있다.
  • 동일한 DAG를 다른 파라미터로 여러 번 실행할 수 있어 DAG의 재사용성이 높아진다.

2. Params를 사용 사례

  • 특정 날짜의 데이터를 재처리 하는 CASE가 있는 경우, 날짜를 Params로 지정한다.
  • 개발 / 테스트 / 운영 환경에 따라 다른 database를 사용하는 경우, 연결 정보를 Params로 지정한다.
  • 파라미터를 변경하며, 성능 테스트를 진행하는 경우, 파라미터를 Params로 지정한다.
  • 메일링 리스트에 인원을 추가해야하는 경우, 이메일 주소를 Params로 지정한다.

3. Params 정의 방법

Airflow DAG에서 Params를 정의하는 방법은 다음과 같다.

 

3-1. 기본 구조

  • Params는 DAG 데코레이터의 params 인자에 딕셔너리 형태로 정의한다.
  • 각 파라미터는 키-값 쌍으로, 값은 Param 객체로 설정하는 방식이다.

3-2. Param 객체

  • airflow.models.param.Param 클래스를 사용하여 생성하는 객체이다.
  • 파라미터의 타입, 기본값, 제약 조건 등을 지정할 수 있다.

3-3. 파라미터 정의 예시

import pendulum
from airflow.decorators import dag, task
from airflow.models.param import Param

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    params= {
        "p1" : Param(default=5, type="integer", minimum=3, maximum=10),
        "p2" : Param(5, type=["null", "number", "string"]),
        "p3": Param("foo", enum=["foo", "bar", 5]),
        "p4" : Param(f"{pendulum.now().date()}", type="string", format="date"),
        "p5" : Param({"key": "value"}, type=["object", "null"])
    },
    tags=["example", "Param"],
)

 

  • 정수형 파라미터 (p1):
    • 기본값이 5인 정수형 파라미터이다.
    • 최소값 3, 최대값 10의 범위 제한이 있는 파라미터이다.
  • 다중 타입 파라미터 (p2):
    • 기본값이 5이며, null, 숫자, 문자열 타입을 모두 허용하는 파라미터이다.
  • 열거형 파라미터 (p3):
    • 기본값이 "foo"이며, "foo", "bar", 5 중 하나의 값만 선택 가능한 파라미터이다.
  • 날짜 형식 파라미터 (p4):
    • 현재 날짜를 기본값으로 가지는 문자열 타입 파라미터이다.
    • 날짜 형식으로 지정된 파라미터이다.
  • 객체 또는 null 파라미터 (p5):
    • 기본값으로 딕셔너리를 가지는 파라미터이다.
    • 객체(딕셔너리) 또는 null 값을 허용하는 파라미터이다.

 

4. Params 사용 방법

Jinja 템플릿을 통해 Params의 값을 추출해 사용할 수 있다. {{ params.parameter_name }} 와 같은 형태로 사용한다.

import pendulum
from airflow.decorators import dag, task
from airflow.models.param import Param

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    params= {
        "p1" : Param(default=5, type="integer", minimum=3, maximum=10),
        "p2" : Param(5, type=["null", "number", "string"]),
        "p3": Param("foo", enum=["foo", "bar", 5]),
        "p4" : Param(f"{pendulum.now().date()}", type="string", format="date"),
        "p5" : Param({"key": "value"}, type=["object", "null"])
    },
    tags=["example", "Param"],
)
def param_test():

    @task
    def print_param(**kwargs):
        params = kwargs["params"]

        print(params["p1"])
        print(params["p2"])
        print(params["p3"])
        print(params["p4"])
        print(params["p5"])

    print_param()

param_test()

 

5. DAG에 값 전달하기

Trigger DAG 버튼을 클릭해 수동으로 DAG를 실행하면 아래와 같이 Params 값을 설정할 수 있는 UI가 표시되고, 값을 설정하면 해당 값이 DAG에 적용된다.

 

사용자가 제공한 값이 검증을 통과하지 못하면 아래와 같이 경고가 표시된다.

댓글