Airflow13 Airflow Custom Connection Type 생성 및 Provider 빌드 가이드: hbase provider 개발하기 hbase provider를 빌드하고, hbase custom connection type을 추가하는 방법을 소개한다.이전에 개발한 hbase hook, operator, sensor를 통합해 provider로 빌드한다. 전체 코드는 다음 Git 저장소에 push 되어 있다.git : https://github.com/kyeongseooh/airflow-provider-hbase1. 프로젝트 생성poetry를 사용해 프로젝트를 생성하고, 필요한 의존성을 추가한다. pyproject.toml를 적절하게 수정해야 한다. 프로젝트 생성poetry new airflow-provider-hbasecd airflow-provider-hbase pyproject.toml 파일 수정 및 의존성 추가[tool.poetr.. 2024. 9. 15. airflow에서 remote spark cluster에 job submit 하기: SparkSubmitOperator airflow에서 SparkSubmitOperator와 task decorator를 사용해 remote spark cluster에 job을 submit하는 방법을 알아본다.spark on yarn 환경으로 spark 3.3.2, hadoop 3이 설치되어 있다. 1. spark binary 다운로드airflow에서 remote spark에 job을 submit 하기 위해서는 airflow가 설치된 서버에 spark-submit 호출이 가능해야 한다.이를 위해 spark binary를 다운받아 spark-submit이 가능하도록 했다.spark 3.3.2, hadoop 3을 사용하고 있어 spark-3.3.2-bin-hadoop3를 다운받았다.## binary 다운로드wget https://archive... 2024. 9. 3. airflow Variable 1. Variables 개요airflow에서 사용하는 key-value 저장소의 역할을 한다.airflow에서 관리하는 글로벌 환경변수로 여러 DAG에서 공유해야 하는 설정 값을 저장한다.DAG 코드의 변경 없이 런타임에 설정을 수정할 수 있다.airflow 메타 데이터베이스에 저장되기에 너무 많은 Variables는 성능에 영향을 줄 수 있다. 2. Variables과 Fernetairflow는 메타스토어 데이터베이스에 저장된 변수를 암호화하기 위해 Fernet을 사용한다.airflow 설치 시 설정한 fernetKey가 사용되며, fernetKey를 설정하지 않고 airflow를 설치한 경우에는 암호화 되지 않는다.UI 상에서도 is Encrypted의 값이 다른 것을 확인할 수 있다.[ fernet.. 2024. 9. 2. airflow helm 설치 1. helm repo 추가 및 values.yaml 다운로드# helm repo addhelm repo add apache-airflow https://airflow.apache.org # values.yaml 다운로드helm show values apache-airflow/airflow > values.yaml 2. values.yaml 수정2-1. ingress 설정# Ingress configurationingress: # Enable all ingress resources (deprecated - use ingress.web.enabled and ingress.flower.enabled) enabled: false # Configs for the Ingress of the web Servi.. 2024. 8. 29. airflow Dynamic Task Mapping 1. Dynamic Task Mapping 개요런타임에 task의 수와 argument를 동적으로 결정할 수 있다.병렬 처리를 통해 처리 시간을 단축할 수 있다.과도한 병렬 처리로 시스템 부하를 유발할 수 있으므로, 적절한 max_active_tasks 설정이 필요하다.각 task에 다른 argument를 전달할 수 있다.mapreduce의 map과 비슷한 개념이라고 볼 수 있다. 2. 주요 개념2-1. expand입력받은 argument를 각 task에 동적으로 매핑해 여러 task instance를 생성한다.아래는 expand를 사용한 간단한 예제이다.@taskdef return_x(x): return xx_values = return_x.expand(x=[1, 2, 3]) 위의 task는 아.. 2024. 8. 20. airflow Data-aware scheduling과 Dataset 1. Data-aware scheduling 개요데이터 셋 업데이트를 기반으로 DAG를 스케줄링할 수 있는 기능이다.아래와 같이 Denpendency Graph를 통해 데이터 셋을 업데이트하는 DAG와 데이터 셋 변경에 의해 트리거된 DAG를 시각적으로 확인할 수 있다.2. Dataset 개념데이터 업데이트를 알리고 이를 기반으로 워크플로우를 트리거하는 역할을 한다.데이터의 논리적 그룹을 나타내는 추상적인 개념으로 실제 데이터를 저장하거나, 관리하지 않는다.URI(Uniform Resource Identifier)로 정의된다. 이 URI는 데이터의 위치나 식별자 역할을 하지만, 실제 데이터를 포함하지는 않는다.3. Dataset 사용 방법3-1. Dataset 생성 방법from airflow.datase.. 2024. 8. 17. airflow Params 1. Airflow Params 개요DAG와 태스크에 동적으로 값을 전달하는 방법이다.런타임에 추가 Params를 제공하거나 Params 값을 덮어쓸 수 있어 DAG의 유연성이 높아진다.스케줄링된 DAG는 Params에 설정된 기본값을 사용한다.DAG를 수동으로 시작하면 Params를 수정해 DAG를 실행할 수 있다.동일한 DAG를 다른 파라미터로 여러 번 실행할 수 있어 DAG의 재사용성이 높아진다.2. Params를 사용 사례특정 날짜의 데이터를 재처리 하는 CASE가 있는 경우, 날짜를 Params로 지정한다.개발 / 테스트 / 운영 환경에 따라 다른 database를 사용하는 경우, 연결 정보를 Params로 지정한다.파라미터를 변경하며, 성능 테스트를 진행하는 경우, 파라미터를 Params로 지.. 2024. 8. 17. airflow 외부 시스템 이용하기 -2: ImpalaHook, S3Hook impala, s3 등의 외부 시스템을 이용하는 방법을 설명한다.1. Impala 연동하기impala provider 설치Impala connection을 생성하기 위해 provider를 설치합니다.apache-airflow-providers-apache-impala connection 생성하기host에는 impala coordinator의 주소를 입력합니다. coordinator가 없다면 impala daemon 중 하나의 주소를 입력합니다. Impala와 연동하기Impala provider는 operator를 제공하지 않고, hook만 제공하기에, hook을 사용해 Impala에 쿼리를 실행해야 합니다.import pendulumfrom airflow.decorators import dag, task.. 2024. 8. 17. airflow ExternalTaskSensor와 TriggerDagRunOperator ExternalTaskSensorDAG 간 의존성이 존재할 경우에 사용되는 sensor로 다른 DAG의 특정 태스크가 완료되었는 지를 확인하는 역할을 수행한다.ExternalTaskSensor은 Logical Date를 기준으로 upstream DAG의 수행을 판단한다. 스케줄링된 시간이 다르거나, 수동으로 DAG를 트리거 한 경우에는 Sensor가 제대로 동작하지 않게 된다. 아래와 같이 DAG의 Logical Date가 동일한 경우에만 ExternalTaskSensor는 upstream DAG가 수행된 것으로 판단한다는 점에 주의하자. ExternalTaskSensor 예제 코드import pendulumfrom airflow.decorators import dag, taskfrom airflow... 2024. 8. 17. airflow Xcom Xcom은 Cross-Communications의 약자로 task 간 데이터를 교환하는 데 사용되는 기능이다.key-value 형태로 key, value, timestamp, task_id, dag_id 등의 메타데이터가 db에 저장된다.xcom_push, xcom_pull 메소드를 사용해 Xcom을 push/pull 할 수 있으며, Operator의 반환 값과 taskflow에서 사용하는 argument들은 자동으로 Xcom에 저장된다. 일반적으로는 airflow 메타 디비에 저장되나, custom한 Xcom을 사용하면 s3 등의 별도 저장소에 저장할 수 있다. Xcom 사용 시 유의사항1. Xcom은 SQLAlchemy의 LargeBinary에 저장된다. - SQLite : blob type에 .. 2024. 8. 11. 이전 1 2 다음 반응형