Xcom은 Cross-Communications의 약자로 task 간 데이터를 교환하는 데 사용되는 기능이다.
key-value 형태로 key, value, timestamp, task_id, dag_id 등의 메타데이터가 db에 저장된다.
xcom_push, xcom_pull 메소드를 사용해 Xcom을 push/pull 할 수 있으며, Operator의 반환 값과 taskflow에서 사용하는 argument들은 자동으로 Xcom에 저장된다.
일반적으로는 airflow 메타 디비에 저장되나, custom한 Xcom을 사용하면 s3 등의 별도 저장소에 저장할 수 있다.
Xcom 사용 시 유의사항
1. Xcom은 SQLAlchemy의 LargeBinary에 저장된다.
- SQLite : blob type에 저장되며 2GB 제한
- PostgreSQL : bytea type에 저장되며 1GB 제한
- MySQL : blob type에 저장되며 64kb 제한
2. 암호화 되지 않고 db에 저장되기 때문에 민감한 정보를 다루기에는 좋지 않다.
1. Operator를 사용해 Xcom push / pull
xcom_push / xcom_pull 메서드를 사용해 Xcom을 push / pull 할 수 있고, Operator의 return 값은 자동으로
Xcom에 push된다. Operator의 return 값의 Xcom key는 `return_value`이다.
ti.xcom_pull(task_ids='{task_id 입력}')과 같이 사용하면 해당 task의 return value를 pull 할 수 있다.
from airflow.decorators import dag, task
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="Xcom",
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "Xcom"],
)
# PythonOperator로 Xcom push
def py_xcom_push(**context):
context["ti"].xcom_push(key="xcom_push_python_key", value="xcom_push_python_value")
return "py_xcom"
# PythonOperator로 Xcom pull
def py_xcom_pull(**context):
print(context["ti"].xcom_pull(key="xcom_push_python_key"))
print(context["ti"].xcom_pull(task_ids="xcom_push_python"))
# BashOperator로 Xcom push
xcom_push_bash = BashOperator(bash_command="echo {{ ti.xcom_push(key='xcom_push_bash_key', value='xcom_push_bash_value') }} && echo 'bash_xcom'",
task_id="xcom_push_bash",
dag=dag)
xcom_push_python = PythonOperator(python_callable=py_xcom_push, task_id="xcom_push_python", dag=dag)
# BashOperator로 Xcom pull
xcom_pull_bash = BashOperator(bash_command="echo $RETURN_VALUE && echo $XCOM_PUSH_BASH_VALUE",
env= {
"RETURN_VALUE" : "{{ ti.xcom_pull(key='xcom_push_bash_key') }}",
"XCOM_PUSH_BASH_VALUE" : "{{ ti.xcom_pull(task_ids='xcom_push_bash') }}"
},
task_id="xcom_pull_bash",
dag=dag)
xcom_pull_python = PythonOperator(python_callable=py_xcom_pull, task_id="xcom_pull_python", dag=dag)
xcom_push_bash >> xcom_push_python >> xcom_pull_bash >> xcom_pull_python
xcom_push_bash 결과
xcom_push_python 결과
xcom_pull_bash 결과
xcom_pull_python 결과
2. TaskFlow 사용하기
task 함수의 반환값은 자동으로 Xcom에 저장되고, task의 반환값을 다음 task의 인자로 직접 사용할 수 있다.
t2 = task2(t1)와 같은 방식으로 태스크 간의 의존성을 명확하게 표현할 수 있다.
import pendulum
from airflow.decorators import dag, task
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "Xcom"],
)
def taskflow_xcom():
@task
def task1():
return "task1"
@task
def task2(task1_value):
print(task1_value)
return "task2"
@task
def task3(task2_value):
print(task2_value)
t1 = task1()
t2 = task2(t1)
task3(t2)
taskflow_xcom()
TaskFlow multiple_outputs 사용하기
multiple_outputs는 태스크에서 딕셔너리를 리턴할 수 있게 해주는 기능이다. 이 기능을 사용하면 딕셔너리의 key-value가 Xcom에 별도로 저장된다.
import pendulum
from airflow.decorators import dag, task
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "Xcom"],
)
def multiple_outputs_example():
@task(multiple_outputs=True)
def return_dict():
return {
"value_1": "첫 번째 값",
"value_2": 42
}
@task
def use_value_1(value_1):
print(f"value_1: {value_1}")
@task
def use_value_2(value_2):
print(f"value_2: {value_2}")
dict_values = return_dict()
use_value_1(dict_values["value_1"])
use_value_2(dict_values["value_2"])
multiple_outputs_example()
task가 딕셔너리를 반환할 때 multiple_outputs을 True로 지정하지 않고 위와 같이 사용하면 아래의 에러가 발생하게 된다.
airflow.exceptions.XComNotFound: XComArg result from return_dict at Xcom with key="value_1" is not found!
'aiflow' 카테고리의 다른 글
airflow sensor: poke와 reschedule (0) | 2024.08.13 |
---|---|
airflow Taskflow api: @dag, @task (0) | 2024.08.11 |
airflow trigger rule (1) | 2024.08.11 |
airflow task 의존성 (0) | 2024.08.11 |
airflow connection과 hook (0) | 2024.08.10 |
댓글