본문 바로가기
aiflow

airflow DAG 시각화 및 구조화: Log Grouping, Edge Labels, Task Groups 활용하기

by kyeongseo.oh 2024. 9. 4.

1. Log Grouping

Log Grouping을 사용하면 작업 로그를 구조화하여 필요한 정보만 빠르게 확인할 수 있다.

import pendulum
from airflow.decorators import dag, task

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    max_active_tasks=3,
    tags=["example", "log grouping"],
)
def log_group():
    
    @task
    def task1():
        print("::group:: mysql 데이터 추출")
        print("데이터 추출 중...")
        print("추출된 레코드 수: 1000")
        print("::endgroup::")

        print("::group:: postgres 데이터 추출")
        print("데이터 추출 중...")
        print("추출된 레코드 수: 2000")
        print("::endgroup::")

    task1()

log_group()

 

로그가 기본적으로 접혀있는 상태로 제공된다.

 

 디테일한 내용을 보고 싶을 때 확장하여 상세 내용을 볼 수 있다.

 

2. Edge Labels

Edge Labels를 사용하여 태스크 간의 관계를 명확히 표현할 수 있다.

import pendulum
from airflow.decorators import dag, task
from airflow.utils.edgemodifier import Label

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    max_active_tasks=3,
    tags=["example", "Label"],
)
def label_dag():

    @task
    def task1():
        print("1")

    @task
    def task2():
        print("2")

    @task
    def task3():
        print("3")

    task1() >> Label("success") >> task2() >> Label("between task2 and task3") >> task3()

label_dag()

 

위와 같이 DAG를 작성하면 task1에서 task2로 가는 엣지에 'success', task2에서 task3으로 가는 엣지에 'between task2 adn task3' 라벨이 붙게 된다.

 

3. Task Groups

Task Groups를 사용하여 관련된 작업들을 묶어 DAG의 구조를 개선할 수 있다.

이 DAG의 실행 흐름은 `start -> (group: task1 -> (task2, task3)) -> end` 와 같다.

import pendulum
from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    max_active_tasks=3,
    tags=["example", "task_group"],
)
def task_group():
    
    @task
    def start():
        print("start")

    with TaskGroup("group") as task_group:
        @task
        def task1():
            print("1")

        @task
        def task2():
            print("2")

        @task
        def task3():
            print("3")

        # taskgroup 내에서의 task 의존성 설정
        task1() >> [task2(), task3()]

    @task
    def end():
        print("end")  

    start() >> task_group >> end()

task_group()

 

위와 같이 DAG를 작성하면 'group' 내의 태스크들이 그룹화되어 DAG 시각화에서 하나의 단위로 표시된다.

 

그룹화된 DAG를 펼치면 아래와 같은 화면을 볼 수 있다.

댓글