본문 바로가기

aiflow25

S3에 XCom 저장하기: XComObjectStorageBackend 설정 가이드 XCom(Cross-communication)은 태스크 간 데이터를 공유하는 기능이다. 기본적으로 XCom 데이터는 Airflow 메타데이터 데이터베이스에 저장되지만, 대용량 데이터를 처리할 때는 이 방식이 제한적일 수 있다. XComObjectStorageBackend을 사용해 XCom을 object storage에 저장하면 이런 제약에서 벗어날 수 있다.  XComObjectStorageBackend 개요 XComObjectStorageBackend는 Apache Airflow의 공식 프로바이더 패키지에 포함된 기능으로 XCom 데이터를 object storage에 저장할 수 있게 해준다.  XComObjectStorageBackend  설정 및 구성 1. 필요한 패키지 설치Amazon Web Ser.. 2024. 9. 16.
Airflow Custom Connection Type 생성 및 Provider 빌드 가이드: hbase provider 개발하기 hbase provider를 빌드하고, hbase custom connection type을 추가하는 방법을 소개한다.이전에 개발한 hbase hook, operator, sensor를 통합해 provider로 빌드한다. 전체 코드는 다음 Git 저장소에 push 되어 있다.git : https://github.com/kyeongseooh/airflow-provider-hbase1. 프로젝트 생성poetry를 사용해 프로젝트를 생성하고, 필요한 의존성을 추가한다. pyproject.toml를 적절하게 수정해야 한다. 프로젝트 생성poetry new airflow-provider-hbasecd airflow-provider-hbase pyproject.toml 파일 수정 및 의존성 추가[tool.poetr.. 2024. 9. 15.
airflow custom sensor 개발 가이드: hbase sensor 개발하기 airflow에서 hbase custom sensor를 개발하는 방법을 소개한다. 이 예제에서는 HBaseRowExistsSensor를 개발하여 특정 row key가 HBase 테이블에 존재하는지 확인한다. 아래 링크에서 개발한 hbase hook을 사용했다. airflow custom hook 가이드 : airflow custom hook 개발 가이드: hbase hook 개발하기 1. Custom Sensor의 구조 custom sensor는 일반적으로 다음과 같은 구조를 가진다. airflow의 BaseSensorOperator를 상속받는다.__init__ 메서드를 정의해 필요한 매개변수를 설정한다.poke 메서드를 구현해 실제 센싱 작업을 수행한다. mode에 poke와 reschedule가 있는.. 2024. 9. 14.
airflow custom operator 개발 가이드: hbase operator 개발하기 airflow에서 hbase custom operator를 개발하는 방법을 소개한다. 이 예제에서는 HBaseBatchPutOperator와 HBaseScanOperator 두 개의 custom operator를 개발하였고, 아래 링크에서 개발한 hbase hook을 사용했다. airflow custom hook 가이드 : airflow custom hook 개발 가이드: hbase hook 개발하기 1. Custom Operator의 구조custom operator는 일반적으로 다음과 같은 구조를 가진다.airflow의 BaseOperator를 상속받는다.__init__ 메서드를 정의해 필요한 매개변수를 설정한다.execute 메서드를 구현해 실제 작업을 수행한다.2. 전체 코드 `dags/includ.. 2024. 9. 12.
airflow custom hook 개발 가이드: hbase hook 개발하기 airflow에서 hbase custom hook을 개발하는 방법을 소개한다. 차후 provider 빌드 시 hbase custom connection type을 추가할 예정이며, 아직 hbase custom connection type은 추가하지 않았기 때문에 generic type을 통해 연결 정보를 추가한다. 전체 코드from airflow.hooks.base import BaseHookfrom typing import Any, Dict, Optionalimport happybaseclass HBaseHook(BaseHook): def __init__( self, *, # 위치 기반 인수를 허용하지 않음 hbase_conn_id: str = "hbase_.. 2024. 9. 12.
airflow DAG 시각화 및 구조화: Log Grouping, Edge Labels, Task Groups 활용하기 1. Log GroupingLog Grouping을 사용하면 작업 로그를 구조화하여 필요한 정보만 빠르게 확인할 수 있다.import pendulumfrom airflow.decorators import dag, task@dag( schedule="@once", start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")), catchup=False, max_active_tasks=3, tags=["example", "log grouping"],)def log_group(): @task def task1(): print("::group:: mysql 데이터 추출") .. 2024. 9. 4.
airflow에서 remote spark cluster에 job submit 하기: SparkSubmitOperator airflow에서 SparkSubmitOperator와 task decorator를 사용해 remote spark cluster에 job을 submit하는 방법을 알아본다.spark on yarn 환경으로 spark 3.3.2, hadoop 3이 설치되어 있다. 1. spark binary 다운로드airflow에서 remote spark에 job을 submit 하기 위해서는 airflow가 설치된 서버에 spark-submit 호출이 가능해야 한다.이를 위해 spark binary를 다운받아 spark-submit이 가능하도록 했다.spark 3.3.2, hadoop 3을 사용하고 있어 spark-3.3.2-bin-hadoop3를 다운받았다.## binary 다운로드wget https://archive... 2024. 9. 3.
airflow Variable 1. Variables 개요airflow에서 사용하는 key-value 저장소의 역할을 한다.airflow에서 관리하는 글로벌 환경변수로 여러 DAG에서 공유해야 하는 설정 값을 저장한다.DAG 코드의 변경 없이 런타임에 설정을 수정할 수 있다.airflow 메타 데이터베이스에 저장되기에 너무 많은 Variables는 성능에 영향을 줄 수 있다. 2. Variables과 Fernetairflow는 메타스토어 데이터베이스에 저장된 변수를 암호화하기 위해 Fernet을 사용한다.airflow 설치 시 설정한 fernetKey가 사용되며, fernetKey를 설정하지 않고 airflow를 설치한 경우에는 암호화 되지 않는다.UI 상에서도 is Encrypted의 값이 다른 것을 확인할 수 있다.[ fernet.. 2024. 9. 2.
airflow Dynamic Task Mapping 1. Dynamic Task Mapping 개요런타임에 task의 수와 argument를 동적으로 결정할 수 있다.병렬 처리를 통해 처리 시간을 단축할 수 있다.과도한 병렬 처리로 시스템 부하를 유발할 수 있으므로, 적절한 max_active_tasks 설정이 필요하다.각 task에 다른 argument를 전달할 수 있다.mapreduce의 map과 비슷한 개념이라고 볼 수 있다. 2. 주요 개념2-1. expand입력받은 argument를 각 task에 동적으로 매핑해 여러 task instance를 생성한다.아래는 expand를 사용한 간단한 예제이다.@taskdef return_x(x): return xx_values = return_x.expand(x=[1, 2, 3])  위의 task는 아.. 2024. 8. 20.
airflow Data-aware scheduling과 Dataset 1. Data-aware scheduling 개요데이터 셋 업데이트를 기반으로 DAG를 스케줄링할 수 있는 기능이다.아래와 같이 Denpendency Graph를 통해 데이터 셋을 업데이트하는 DAG와 데이터 셋 변경에 의해 트리거된 DAG를 시각적으로 확인할 수 있다.2. Dataset 개념데이터 업데이트를 알리고 이를 기반으로 워크플로우를 트리거하는 역할을 한다.데이터의 논리적 그룹을 나타내는 추상적인 개념으로 실제 데이터를 저장하거나, 관리하지 않는다.URI(Uniform Resource Identifier)로 정의된다. 이 URI는 데이터의 위치나 식별자 역할을 하지만, 실제 데이터를 포함하지는 않는다.3. Dataset 사용 방법3-1. Dataset 생성 방법from airflow.datase.. 2024. 8. 17.
반응형