본문 바로가기
aiflow

Airflow Custom Connection Type 생성 및 Provider 빌드 가이드: hbase provider 개발하기

by kyeongseo.oh 2024. 9. 15.

hbase provider를 빌드하고, hbase custom connection type을 추가하는 방법을 소개한다.

이전에 개발한 hbase hook, operator, sensor를 통합해 provider로 빌드한다.

 

전체 코드는 다음 Git 저장소에 push 되어 있다.

git : https://github.com/kyeongseooh/airflow-provider-hbase

1. 프로젝트 생성

poetry를 사용해 프로젝트를 생성하고, 필요한 의존성을 추가한다. pyproject.toml를 적절하게 수정해야 한다.

 

  • 프로젝트 생성
poetry new airflow-provider-hbase
cd airflow-provider-hbase

 

  • pyproject.toml 파일 수정 및 의존성 추가
[tool.poetry]
# 프로젝트의 메타데이터를 정의하는 섹션
name = "airflow-provider-hbase"  # 패키지의 이름
version = "0.1.0"  # 패키지의 버전
description = ""  # 패키지에 대한 간단한 설명 (필요시 추가)
authors = ["Kyeongseo Oh <kyeongseo96@gmail.com>"]  # 작성자 정보
readme = "README.md"  # README 파일의 위치
packages = [{include = "airflow_provider_hbase"}] # 패키지에 포함될 Python 모듈

[tool.poetry.dependencies]
# 프로젝트의 런타임 의존성을 정의하는 섹션
python = ">=3.8,<3.13"  # 지원하는 Python 버전 범위
apache-airflow = "^2.0" # apache-airflow 의존성 추가
happybase = "^1.0.0"    # happybase 의존성 추가

[build-system]
# 빌드 시스템 요구사항을 지정하는 섹션
requires = ["poetry-core"]  # 필요한 빌드 시스템 (poetry-core 사용)
build-backend = "poetry.core.masonry.api"  # 사용할 빌드 백엔드

[tool.poetry.plugins."apache_airflow_provider"]
# Airflow provider 플러그인 설정
provider_info = "airflow_provider_hbase.__init__:get_provider_info" # provider_info 함수의 위치

 

2. 프로젝트 구조 설정

아래와 같은 프로젝트 구조를 만든다.

 

각 파일에는 개발한 코드를 추가한다.

airflow-provider-hbase/
├── airflow_provider_hbase
│   ├── hooks
│   │   ├── hbase.py
│   │   └── __init__.py
│   ├── __init__.py
│   ├── operators
│   │   ├── hbase.py
│   │   └── __init__.py
│   └── sensors
│       ├── hbase.py
│       └── __init__.py
├── pyproject.toml
├── README.md
└── tests
    └── __init__.py

 

3. Provider 메타데이터 설정

`airflow_provider_hbase/__init__.py` 파일에 다음 내용을 추가한다.

 

get_provider_info 함수는 airflow에 프로바이더에 대한 정보를 제공한다. 이를 통해 airflow는 프로바이더를 식별하고, 필요한 컴포넌트를 동적으로 로드할 수 있다.

def get_provider_info():
    return {
        # PyPI에 등록될 패키지 이름. Airflow 생태계에서 이 프로바이더를 식별하는 데 사용됨
        "package-name": "airflow-provider-hbase",
        "name": "Hbase", # Airflow UI나 로그에서 이 프로바이더를 지칭할 때 사용됨
        "description": "apache airflow custom hbase provider",
        
        # 이 프로바이더가 제공하는 연결 유형에 대한 정보
        "connection-types": [
            {
                # Airflow에서 이 연결을 식별하는 데 사용되는 고유 식별자
                "connection-type": "hbase",
                # 이 연결 유형에 대한 Hook 클래스의 전체 경로
                "hook-class-name": "airflow_provider_hbase.hooks.hbase.HBaseHook"
            }
        ],
    }

 

4. Custom Connection Type 생성

`hooks/hbase.py` 파일에 다음 내용을 추가한다.

 

  • 연결 속성 정의: HBase 연결을 위한 기본 속성들을 설정한다.
  • UI 위젯 생성: get_connection_form_widgets 메서드를 통해 airflow ui에 추가 필드(transport, protocol)를 생성한다.
  • UI 필드 동작 정의: get_ui_field_behaviour 메서드로 UI에서의 필드 동작을 커스터마이즈한다. 숨길 필드, 기본값 등을 지정한다.
from airflow.hooks.base import BaseHook
from typing import Any, Dict, Optional
import happybase

class HBaseHook(BaseHook):
    
    conn_name_attr = "hbase_conn_id"
    default_conn_name = "hbase_default"
    conn_type = "hbase"
    hook_name = "Hbase"
    
    @classmethod
    def get_connection_form_widgets(cls) -> Dict[str, Any]:
        # ui에 표시될 추가 필드를 생성한다
        from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
        from flask_babel import lazy_gettext
        from wtforms import StringField

        return {
            "transport": StringField(lazy_gettext("Transport"), widget=BS3TextFieldWidget()),
            "protocol": StringField(lazy_gettext("Protocol"), widget=BS3TextFieldWidget()),
        }

    @classmethod
    def get_ui_field_behaviour(cls) -> Dict[str, Any]:
        # ui에서 숨길 필드, 필드의 기본값 등을 지정한다.
        return {
            "hidden_fields": ["password", "login", "schema"],
            "relabeling": {},
            "placeholders": {
                "host": "localhost",
                "port": "9090", # placeholders는 문자열로 선언해야 한다.
                "transport": "buffered",
                "protocol": "binary",
            },
        }
        
    def __init__(...):
        # 기존 코드 그대로

    def get_conn(self):
    	# 기존 코드 그대로

 

5. Provider 패키징

apache-airflow-providers-hbase 디렉토리에서 아래 명령어로 wheel 파일을 생성한다.

이 명령어를 실행하면 dist 디렉토리 내에 .whl 확장자를 가진 wheel 파일이 생성된다.

poetry build

 

6. Airflow에 Provider 설치

생성된 wheel 파일을 airflow 환경에 설치하고, provider를 적용하기 위해 airflow를 재시작한다.

pip install dist/airflow_provider_hbase-0.1.0-py3-none-any.whl

 

provider가 등록되었는 지 확인한다.

 

Hbase connection type이 추가되었는 지 확인한다.

 

7. Airflow에서 사용해보기

DAG를 작성하여 custom provider를 테스트한다.

 

우선 hbase와 연결하기 위해 hbase connection을 생성한다. host에는 hbase thrift 서버의 주소를 입력하고, port에는 hbase thrift 서버의 port를 작성한다.

 

 

예제 DAG

아래 DAG는 operator를 사용해 hbase 테이블에 데이터를 일괄 삽입하고, sensor를 사용해 데이터가 적재되었는 지 확인한다. 마지막으로 지정된 행 범위의 데이터를 scan해 출력한다.

import pendulum
from airflow.decorators import dag, task
from airflow_provider_hbase.operators.hbase import HBaseBatchPutOperator, HBaseScanOperator
from airflow_provider_hbase.sensors.hbase import HBaseRowkeySensor

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "hbase", "custom_provider"],
)
def hbase_dag():

    data = [
        ('row011', {'cf1:column1': 'value1', 'cf1:column2': 'value2'}),
        ('row012', {'cf1:column1': 'value3', 'cf1:column2': 'value4'}),
        ('row013', {'cf1:column1': 'value5', 'cf1:column2': 'value6'}),
        ('row014', {'cf1:column1': 'value7', 'cf1:column2': 'value8'}),
        ('row015', {'cf1:column1': 'value9', 'cf1:column2': 'value10'}),
        ('row016', {'cf1:column1': 'value11', 'cf1:column2': 'value12'}),
        ('row017', {'cf1:column1': 'value13', 'cf1:column2': 'value14'}),
        ('row018', {'cf1:column1': 'value15', 'cf1:column2': 'value16'}),
        ('row019', {'cf1:column1': 'value17', 'cf1:column2': 'value18'}),
        ('row020', {'cf1:column1': 'value19', 'cf1:column2': 'value20'}),
    ]

    insert = HBaseBatchPutOperator(task_id="batch_insert", hbase_conn_id="hbase", table_name="airflow", batch_size=5, data=data)

    sensor = HBaseRowkeySensor(
        task_id="sensor", 
        hbase_conn_id="hbase_default", 
        table="airflow", 
        row_start="row011",
        row_stop="row021", 
        mode="reschedule", 
        poke_interval=10, 
        timeout=30,
        soft_fail=True
        )

    scan = HBaseScanOperator(task_id="scan_data", 
                            hbase_conn_id="hbase", 
                            table_name="airflow", 
                            scan_kwargs={
                                'row_start': 'row011',
                                'row_stop': 'row021',
                                }
                            )

    insert >> sensor >> scan

hbase_dag()

 

 

댓글