본문 바로가기
aiflow

airflow Xcom

by kyeongseo.oh 2024. 8. 11.

Xcom은 Cross-Communications의 약자로 task 간 데이터를 교환하는 데 사용되는 기능이다.

key-value 형태로 key, value, timestamp, task_id, dag_id 등의 메타데이터가 db에 저장된다.

xcom_push, xcom_pull 메소드를 사용해 Xcom을 push/pull 할 수 있으며, Operator의 반환 값과 taskflow에서 사용하는 argument들은 자동으로 Xcom에 저장된다.

 

일반적으로는 airflow 메타 디비에 저장되나, custom한 Xcom을 사용하면 s3 등의 별도 저장소에 저장할 수 있다.

 

Xcom 사용 시 유의사항

1. Xcom은 SQLAlchemy의 LargeBinary에 저장된다.
    - SQLite : blob type에 저장되며 2GB 제한
    - PostgreSQL : bytea type에 저장되며 1GB 제한
    - MySQL : blob type에 저장되며 64kb 제한

2. 암호화 되지 않고 db에 저장되기 때문에 민감한 정보를 다루기에는 좋지 않다.

 

1. Operator를 사용해 Xcom push / pull

xcom_push / xcom_pull 메서드를 사용해 Xcom을 push / pull 할 수 있고, Operator의 return 값은 자동으로

Xcom에 push된다. Operator의 return 값의 Xcom key는 `return_value`이다.

ti.xcom_pull(task_ids='{task_id 입력}')과 같이 사용하면 해당 task의 return value를 pull 할 수 있다.

from airflow.decorators import dag, task
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="Xcom",
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "Xcom"],
)

# PythonOperator로 Xcom push
def py_xcom_push(**context):
    context["ti"].xcom_push(key="xcom_push_python_key", value="xcom_push_python_value")
    return "py_xcom"

# PythonOperator로 Xcom pull
def py_xcom_pull(**context):
    print(context["ti"].xcom_pull(key="xcom_push_python_key"))
    print(context["ti"].xcom_pull(task_ids="xcom_push_python"))

# BashOperator로 Xcom push
xcom_push_bash = BashOperator(bash_command="echo {{ ti.xcom_push(key='xcom_push_bash_key', value='xcom_push_bash_value') }} && echo 'bash_xcom'", 
                                task_id="xcom_push_bash", 
                                dag=dag)

xcom_push_python = PythonOperator(python_callable=py_xcom_push, task_id="xcom_push_python", dag=dag)

# BashOperator로 Xcom pull
xcom_pull_bash = BashOperator(bash_command="echo $RETURN_VALUE && echo $XCOM_PUSH_BASH_VALUE", 
                                env= {
                                    "RETURN_VALUE" : "{{ ti.xcom_pull(key='xcom_push_bash_key') }}",
                                    "XCOM_PUSH_BASH_VALUE" : "{{ ti.xcom_pull(task_ids='xcom_push_bash') }}"
                                },
                                task_id="xcom_pull_bash", 
                                dag=dag)

xcom_pull_python = PythonOperator(python_callable=py_xcom_pull, task_id="xcom_pull_python", dag=dag)


xcom_push_bash >> xcom_push_python >> xcom_pull_bash >> xcom_pull_python

 

 

xcom_push_bash 결과

 

xcom_push_python 결과

 

xcom_pull_bash 결과

 

xcom_pull_python 결과

 

2. TaskFlow 사용하기

task 함수의 반환값은 자동으로 Xcom에 저장되고, task의 반환값을 다음 task의 인자로 직접 사용할 수 있다.

 t2 = task2(t1)와 같은 방식으로 태스크 간의 의존성을 명확하게 표현할 수 있다.

import pendulum
from airflow.decorators import dag, task

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "Xcom"],
)
def taskflow_xcom():

    @task
    def task1():
        return "task1"

    @task
    def task2(task1_value):
        print(task1_value)
        return "task2"

    @task
    def task3(task2_value):
        print(task2_value)

    t1 = task1()
    t2 = task2(t1)
    task3(t2)

taskflow_xcom()

 

TaskFlow multiple_outputs 사용하기

multiple_outputs는 태스크에서 딕셔너리를 리턴할 수 있게 해주는 기능이다. 이 기능을 사용하면 딕셔너리의 key-value가 Xcom에 별도로 저장된다.

 

import pendulum
from airflow.decorators import dag, task

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "Xcom"],
)
def multiple_outputs_example():

    @task(multiple_outputs=True)
    def return_dict():
        return {
            "value_1": "첫 번째 값",
            "value_2": 42
        }

    @task
    def use_value_1(value_1):
        print(f"value_1: {value_1}")

    @task
    def use_value_2(value_2):
        print(f"value_2: {value_2}")

    dict_values = return_dict()
    use_value_1(dict_values["value_1"])
    use_value_2(dict_values["value_2"])

multiple_outputs_example()

 

task가 딕셔너리를 반환할 때 multiple_outputs을 True로 지정하지 않고 위와 같이 사용하면 아래의 에러가 발생하게 된다.

airflow.exceptions.XComNotFound: XComArg result from return_dict at Xcom with key="value_1" is not found!

'aiflow' 카테고리의 다른 글

airflow sensor: poke와 reschedule  (0) 2024.08.13
airflow Taskflow api: @dag, @task  (0) 2024.08.11
airflow trigger rule  (1) 2024.08.11
airflow task 의존성  (0) 2024.08.11
airflow connection과 hook  (0) 2024.08.10

댓글