본문 바로가기
aiflow

airflow Taskflow api: @dag, @task

by kyeongseo.oh 2024. 8. 11.

1. Taskflow API 소개

Taskflow API는 Airflow에서 Python 함수를 간단하게 DAG 태스크로 변환할 수 있는 기능이다. 이 API를 사용하면 다음과 같은 이점이 있다.

  • 기존 Python 코드를 Airflow로 쉽게 이관할 수 있다.
  • 코드의 가독성과 유지보수성을 향상시킬 수 있다.
  • DAG 구조를 더 직관적으로 표현할 수 있다.

2. 기본 사용법

Taskflow API의 핵심 데코레이터는 아래와 같다.

  • @dag : DAG를 정의하는 데코레이터이다.
  • @task : 개별 태스크를 정의하는 데코레이터이다.

기본 예시

import pendulum
from airflow.decorators import dag, task
import json

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "Taskflow"],
)
def tutorial_taskflow_api():

    @task()
    def extract():
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        return json.loads(data_string)

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        total_order_value = sum(order_data_dict.values())
        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])

tutorial_taskflow_api()

예시 설명:

  • extract 태스크는 데이터를 추출하는 기능이다.
  • transform 태스크는 데이터를 변환하는 기능이다.
  • load 태스크는 결과를 출력하는 기능이다.

3. 고급 기능

3.1 다양한 태스크 유형

Taskflow API가 지원하는 태스크는 다음과 같다.

class TaskDecoratorCollection:
    """Implementation to provide the ``@task`` syntax."""

    python = staticmethod(python_task)
    virtualenv = staticmethod(virtualenv_task)
    external_python = staticmethod(external_python_task)
    branch = staticmethod(branch_task)
    branch_virtualenv = staticmethod(branch_virtualenv_task)
    branch_external_python = staticmethod(branch_external_python_task)
    short_circuit = staticmethod(short_circuit_task)
    sensor = staticmethod(sensor_task)
    bash = staticmethod(bash_task)
    run_if = staticmethod(run_if)
    skip_if = staticmethod(skip_if)

 

 

task.bash와 task.sensor을 사용한 예시

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "Taskflow"],
)
def advanced_taskflow_example():

    @task.bash(env={"FILENAME": 'test.txt'})
    def create_file():
        return "echo 'create file test' > /home/airflow/$FILENAME"

    @task.sensor(mode='poke', poke_interval=10, timeout=30)
    def check_file_exists():
        import os
        return os.path.exists("/home/airflow/test.txt")

    @task
    def read_file():
        with open("/home/airflow/test.txt", "r") as f:
            print(f.read())

    create_file() >> check_file_exists() >> read_file()

advanced_taskflow_example()

예시 설명:

  • create_file은 Bash 명령으로 파일을 생성하는 태스크이다.
  • check_file_exists는 센서로 파일 존재를 확인하는 태스크이다.
  • read_file은 Python 함수로 파일 내용을 읽는 태스크이다.

3.2 조건부 실행

Airflow 2.10.0부터 run_ifskip_if 데코레이터를 사용하여 조건부 태스크 실행이 가능한 기능이다:

import os
from airflow.decorators import dag, task
import pendulum

def file_check(context):
    return os.path.exists("/home/airflow/test.txt")

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz="Asia/Seoul"),
    catchup=False,
    tags=["example", "Taskflow"],
)
def conditional_task_example():

    @task.run_if(file_check, skip_message="file not exists")
    def run_if_task():
        print("run if task activate")

    @task.skip_if(file_check, skip_message='file exists')
    def skip_if_task():
        print("skip if task activate")

    [run_if_task(), skip_if_task()]

conditional_task_example()

예시 설명:

  • run_if_task는 파일이 존재할 때만 실행되는 태스크이다.
  • skip_if_task는 파일이 존재하면 스킵되는 태스크이다.

4. 주요 특징 요약

  • 간단한 Python 함수를 Airflow 태스크로 변환하는 기능이다.
  • 다양한 태스크 유형을 지원하는 시스템이다 (Python, Bash, Sensor 등).
  • 태스크 간 데이터 흐름을 명확하게 표현하는 방법이다.
  • 조건부 실행 기능으로 유연한 워크플로우를 구성하는 도구이다.

'aiflow' 카테고리의 다른 글

airflow ExternalTaskSensor와 TriggerDagRunOperator  (0) 2024.08.17
airflow sensor: poke와 reschedule  (0) 2024.08.13
airflow Xcom  (0) 2024.08.11
airflow trigger rule  (1) 2024.08.11
airflow task 의존성  (0) 2024.08.11

댓글