본문 바로가기
aiflow

S3에 XCom 저장하기: XComObjectStorageBackend 설정 가이드

by kyeongseo.oh 2024. 9. 16.

XCom(Cross-communication)은 태스크 간 데이터를 공유하는 기능이다. 기본적으로 XCom 데이터는 Airflow 메타데이터 데이터베이스에 저장되지만, 대용량 데이터를 처리할 때는 이 방식이 제한적일 수 있다.

XComObjectStorageBackend을 사용해 XCom을 object storage에 저장하면 이런 제약에서 벗어날 수 있다.

 

XComObjectStorageBackend 개요

XComObjectStorageBackend는 Apache Airflow의 공식 프로바이더 패키지에 포함된 기능으로 XCom 데이터를 object storage에 저장할 수 있게 해준다.

 

XComObjectStorageBackend  설정 및 구성

1. 필요한 패키지 설치

Amazon Web Service Connection을 생성해야 하기에 amazon provider를 설치한다.

pip install apache-airflow-providers-amazon

 

2. aws connection 생성

bucket에 XCom을 저장할 수 있도록 인증 정보를 추가한 connection을 생성한다.

 

3. airflow.cfg 파일 수정

다음 구성은 aws_default connection을 사용해 sandbox bucket의 xcom 경로 하위에 데이터를 저장한다.

xcom_objectstorage_threshold를 `1048576`으로 설정하면 1MB 이상의 XCom만 S3에 저장하고 그 이하는 기존과 동일하게 메타데이터 데이터베이스에 저장한다.

 

  • xcom_objectstorage_path : XCom 데이터가 저장될 객체 스토리지의 경로를 지정
  • xcom_objectstorage_threshold : XCom 데이터가 객체 스토리지에 저장되기 위한 최소 크기 임계값(바이트)을 지정
  • xcom_objectstorage_compression : XCom 데이터의 압축 방식을 지정
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://aws_default@sandbox/xcom
xcom_objectstorage_threshold = 0
xcom_objectstorage_compression = gzip

 

4. airflow 재시작

airflow.cfg를 수정한 후에는 적용을 위해 airflow webserver와 scheduler를 재시작해야 한다.

 

※ troubleshooting

airflow 재시작 시 아래와 같은 에러가 발생했다. 

sqlalchemy.exc.InvalidRequestError: Table 'xcom' is already defined for this MetaData instance.  Specify 'extend_existing=True' to redefine options and columns on an existing Table object.

 

위의 에러는 apache-airflow-providers-common-io 버전이 낮아 발생하는 에러로, apache-airflow-providers-common-io의 버전을 1.4.0 이상으로 업그레이드 해주면 문제가 해결된다.

 

아래 커맨드를 사용해 apache-airflow-providers-common-io을 업그레이드 한 후 airflow를 재시작한다.

pip install -U apache-airflow-providers-common-io==1.4.0

 

사용 예제

XComObjectStorageBackend를 사용할 때의 기본적인 XCom 사용 방식은 기존과 동일하다. 즉, xcom_push와 xcom_pull 메서드를 그대로 사용할 수 있다.

 

기존과의 차이점은 UI에 실제 데이터 내용이 아닌 S3 객체의 경로가 출력된다는 점이다.

 

XCom 데이터가 S3에 저장될 때 사용되는 경로의 구조는 다음과 같다.

s3://sandbox/xcom/<dag_id>/<run_id>/<task_id>/<UUID>.gz

댓글