1. Taskflow API 소개
Taskflow API는 Airflow에서 Python 함수를 간단하게 DAG 태스크로 변환할 수 있는 기능이다. 이 API를 사용하면 다음과 같은 이점이 있다.
- 기존 Python 코드를 Airflow로 쉽게 이관할 수 있다.
- 코드의 가독성과 유지보수성을 향상시킬 수 있다.
- DAG 구조를 더 직관적으로 표현할 수 있다.
2. 기본 사용법
Taskflow API의 핵심 데코레이터는 아래와 같다.
@dag
: DAG를 정의하는 데코레이터이다.@task
: 개별 태스크를 정의하는 데코레이터이다.
기본 예시
import pendulum
from airflow.decorators import dag, task
import json
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz="Asia/Seoul"),
catchup=False,
tags=["example", "Taskflow"],
)
def tutorial_taskflow_api():
@task()
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = sum(order_data_dict.values())
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_taskflow_api()
예시 설명:
- extract 태스크는 데이터를 추출하는 기능이다.
- transform 태스크는 데이터를 변환하는 기능이다.
- load 태스크는 결과를 출력하는 기능이다.
3. 고급 기능
3.1 다양한 태스크 유형
Taskflow API가 지원하는 태스크는 다음과 같다.
class TaskDecoratorCollection:
"""Implementation to provide the ``@task`` syntax."""
python = staticmethod(python_task)
virtualenv = staticmethod(virtualenv_task)
external_python = staticmethod(external_python_task)
branch = staticmethod(branch_task)
branch_virtualenv = staticmethod(branch_virtualenv_task)
branch_external_python = staticmethod(branch_external_python_task)
short_circuit = staticmethod(short_circuit_task)
sensor = staticmethod(sensor_task)
bash = staticmethod(bash_task)
run_if = staticmethod(run_if)
skip_if = staticmethod(skip_if)
task.bash와 task.sensor을 사용한 예시
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz="Asia/Seoul"),
catchup=False,
tags=["example", "Taskflow"],
)
def advanced_taskflow_example():
@task.bash(env={"FILENAME": 'test.txt'})
def create_file():
return "echo 'create file test' > /home/airflow/$FILENAME"
@task.sensor(mode='poke', poke_interval=10, timeout=30)
def check_file_exists():
import os
return os.path.exists("/home/airflow/test.txt")
@task
def read_file():
with open("/home/airflow/test.txt", "r") as f:
print(f.read())
create_file() >> check_file_exists() >> read_file()
advanced_taskflow_example()
예시 설명:
create_file
은 Bash 명령으로 파일을 생성하는 태스크이다.check_file_exists
는 센서로 파일 존재를 확인하는 태스크이다.read_file
은 Python 함수로 파일 내용을 읽는 태스크이다.
3.2 조건부 실행
Airflow 2.10.0부터 run_if
와 skip_if
데코레이터를 사용하여 조건부 태스크 실행이 가능한 기능이다:
import os
from airflow.decorators import dag, task
import pendulum
def file_check(context):
return os.path.exists("/home/airflow/test.txt")
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz="Asia/Seoul"),
catchup=False,
tags=["example", "Taskflow"],
)
def conditional_task_example():
@task.run_if(file_check, skip_message="file not exists")
def run_if_task():
print("run if task activate")
@task.skip_if(file_check, skip_message='file exists')
def skip_if_task():
print("skip if task activate")
[run_if_task(), skip_if_task()]
conditional_task_example()
예시 설명:
run_if_task
는 파일이 존재할 때만 실행되는 태스크이다.skip_if_task
는 파일이 존재하면 스킵되는 태스크이다.
4. 주요 특징 요약
- 간단한 Python 함수를 Airflow 태스크로 변환하는 기능이다.
- 다양한 태스크 유형을 지원하는 시스템이다 (Python, Bash, Sensor 등).
- 태스크 간 데이터 흐름을 명확하게 표현하는 방법이다.
- 조건부 실행 기능으로 유연한 워크플로우를 구성하는 도구이다.
'aiflow' 카테고리의 다른 글
airflow ExternalTaskSensor와 TriggerDagRunOperator (0) | 2024.08.17 |
---|---|
airflow sensor: poke와 reschedule (0) | 2024.08.13 |
airflow Xcom (0) | 2024.08.11 |
airflow trigger rule (1) | 2024.08.11 |
airflow task 의존성 (0) | 2024.08.11 |
댓글