XCom(Cross-communication)은 태스크 간 데이터를 공유하는 기능이다. 기본적으로 XCom 데이터는 Airflow 메타데이터 데이터베이스에 저장되지만, 대용량 데이터를 처리할 때는 이 방식이 제한적일 수 있다.
XComObjectStorageBackend을 사용해 XCom을 object storage에 저장하면 이런 제약에서 벗어날 수 있다.
XComObjectStorageBackend 개요
XComObjectStorageBackend는 Apache Airflow의 공식 프로바이더 패키지에 포함된 기능으로 XCom 데이터를 object storage에 저장할 수 있게 해준다.
XComObjectStorageBackend 설정 및 구성
1. 필요한 패키지 설치
Amazon Web Service Connection을 생성해야 하기에 amazon provider를 설치한다.
pip install apache-airflow-providers-amazon
2. aws connection 생성
bucket에 XCom을 저장할 수 있도록 인증 정보를 추가한 connection을 생성한다.
3. airflow.cfg 파일 수정
다음 구성은 aws_default connection을 사용해 sandbox bucket의 xcom 경로 하위에 데이터를 저장한다.
xcom_objectstorage_threshold를 `1048576`으로 설정하면 1MB 이상의 XCom만 S3에 저장하고 그 이하는 기존과 동일하게 메타데이터 데이터베이스에 저장한다.
- xcom_objectstorage_path : XCom 데이터가 저장될 객체 스토리지의 경로를 지정
- xcom_objectstorage_threshold : XCom 데이터가 객체 스토리지에 저장되기 위한 최소 크기 임계값(바이트)을 지정
- xcom_objectstorage_compression : XCom 데이터의 압축 방식을 지정
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
xcom_objectstorage_path = s3://aws_default@sandbox/xcom
xcom_objectstorage_threshold = 0
xcom_objectstorage_compression = gzip
4. airflow 재시작
airflow.cfg를 수정한 후에는 적용을 위해 airflow webserver와 scheduler를 재시작해야 한다.
※ troubleshooting
airflow 재시작 시 아래와 같은 에러가 발생했다.
sqlalchemy.exc.InvalidRequestError: Table 'xcom' is already defined for this MetaData instance. Specify 'extend_existing=True' to redefine options and columns on an existing Table object.
위의 에러는 apache-airflow-providers-common-io 버전이 낮아 발생하는 에러로, apache-airflow-providers-common-io의 버전을 1.4.0 이상으로 업그레이드 해주면 문제가 해결된다.
아래 커맨드를 사용해 apache-airflow-providers-common-io을 업그레이드 한 후 airflow를 재시작한다.
pip install -U apache-airflow-providers-common-io==1.4.0
사용 예제
XComObjectStorageBackend를 사용할 때의 기본적인 XCom 사용 방식은 기존과 동일하다. 즉, xcom_push와 xcom_pull 메서드를 그대로 사용할 수 있다.
기존과의 차이점은 UI에 실제 데이터 내용이 아닌 S3 객체의 경로가 출력된다는 점이다.
XCom 데이터가 S3에 저장될 때 사용되는 경로의 구조는 다음과 같다.
s3://sandbox/xcom/<dag_id>/<run_id>/<task_id>/<UUID>.gz
'aiflow' 카테고리의 다른 글
Airflow Custom Connection Type 생성 및 Provider 빌드 가이드: hbase provider 개발하기 (1) | 2024.09.15 |
---|---|
airflow custom sensor 개발 가이드: hbase sensor 개발하기 (0) | 2024.09.14 |
airflow custom operator 개발 가이드: hbase operator 개발하기 (1) | 2024.09.12 |
airflow custom hook 개발 가이드: hbase hook 개발하기 (0) | 2024.09.12 |
airflow DAG 시각화 및 구조화: Log Grouping, Edge Labels, Task Groups 활용하기 (0) | 2024.09.04 |
댓글