본문 바로가기
aiflow

airflow custom hook 개발 가이드: hbase hook 개발하기

by kyeongseo.oh 2024. 9. 12.

airflow에서 hbase custom hook을 개발하는 방법을 소개한다. 차후 provider 빌드 시 hbase custom connection type을 추가할 예정이며, 아직 hbase custom connection type은 추가하지 않았기 때문에 generic type을 통해 연결 정보를 추가한다.

 

전체 코드

from airflow.hooks.base import BaseHook
from typing import Any, Dict, Optional
import happybase

class HBaseHook(BaseHook):

    def __init__(
        self, 
        *, # 위치 기반 인수를 허용하지 않음
        hbase_conn_id: str = "hbase_default",
        host: Optional[str] = None,
        port: Optional[int] = None,
        timeout: Optional[int] = None,
        autoconnect: Optional[bool] = None,
        table_prefix: Optional[str] = None,
        table_prefix_separator: Optional[str] = None,
        compat: Optional[str] = None,
        transport: Optional[str] = None,
        protocol: Optional[str] = None,
        **kwargs: Any
    ):
        self.hbase_conn_id = hbase_conn_id
        self.host = host
        self.port = port
        self.timeout = timeout
        self.autoconnect = autoconnect
        self.table_prefix = table_prefix
        self.table_prefix_separator = table_prefix_separator
        self.compat = compat
        self.transport = transport
        self.protocol = protocol
        self.kwargs = kwargs

    def get_conn(self) -> happybase.Connection:
        connection = self.get_connection(self.hbase_conn_id)
        conn_params = {
            'host': connection.host,
            'port': connection.port,
            'timeout': connection.extra_dejson.get('timeout', None),
            'autoconnect': connection.extra_dejson.get('autoconnect', True),
            'table_prefix': connection.extra_dejson.get('table_prefix', None),
            'table_prefix_separator': connection.extra_dejson.get('table_prefix_separator', b'_'),
            'compat': connection.extra_dejson.get('compat', '0.98'),
            'transport': connection.extra_dejson.get('transport', 'buffered'),
            'protocol': connection.extra_dejson.get('protocol', 'binary')
        }

        # 사용자가 제공한 값으로 override
        for param, value in {
            'host': self.host,
            'port': self.port,
            'timeout': self.timeout,
            'autoconnect': self.autoconnect,
            'table_prefix': self.table_prefix,
            'table_prefix_separator': self.table_prefix_separator,
            'compat': self.compat,
            'transport': self.transport,
            'protocol': self.protocol
        }.items():
            if value is not None:
                conn_params[param] = value

        # 추가 kwargs 업데이트
        conn_params.update(self.kwargs)

        return happybase.Connection(**conn_params)

 

코드 설명

1. 임포트 및 클래스 정의

from airflow.hooks.base import BaseHook
from typing import Any, Dict, Optional
import happybase

class HBaseHook(BaseHook):
    ...

 

  • BaseHook을 상속받아 custom hook 클래스를 정의한다.
  • typing을 사용해 타입 힌팅을 제공한다.
  • happybase를 사용해 hbase에 접근한다.

2. 초기화 메서드

class HBaseHook(BaseHook):

    def __init__(
        self, 
        *, # 위치 기반 인수를 허용하지 않음
        hbase_conn_id: str = "hbase_default",
        host: Optional[str] = None,
        port: Optional[int] = None,
        timeout: Optional[int] = None,
        autoconnect: Optional[bool] = None,
        table_prefix: Optional[str] = None,
        table_prefix_separator: Optional[str] = None,
        compat: Optional[str] = None,
        transport: Optional[str] = None,
        protocol: Optional[str] = None,
        **kwargs: Any
    ):
        self.hbase_conn_id = hbase_conn_id
        self.host = host
        self.port = port
        self.timeout = timeout
        self.autoconnect = autoconnect
        self.table_prefix = table_prefix
        self.table_prefix_separator = table_prefix_separator
        self.compat = compat
        self.transport = transport
        self.protocol = protocol
        self.kwargs = kwargs

 

  • *을 사용해 위치 인자를 방지하고, 키워드 인자만 허용한다.
  • 기본 연결 ID 및 매개변수들을 정의한다.
  • 모든 매개변수를 인스턴스 변수로 할당해, 다른 메서드에서 이 값들에 접근할 수 있게 하고, airflow connection에서 설정한 값을 사용하거나, 인스턴스 생성 시 직접 지정한 값을 사용할 수 있게 한다.

3. get_conn 메서드

    def get_conn(self) -> happybase.Connection:
        connection = self.get_connection(self.hbase_conn_id)
        conn_params = {
            'host': connection.host,
            'port': connection.port,
            'timeout': connection.extra_dejson.get('timeout', None),
            'autoconnect': connection.extra_dejson.get('autoconnect', True),
            'table_prefix': connection.extra_dejson.get('table_prefix', None),
            'table_prefix_separator': connection.extra_dejson.get('table_prefix_separator', b'_'),
            'compat': connection.extra_dejson.get('compat', '0.98'),
            'transport': connection.extra_dejson.get('transport', 'buffered'),
            'protocol': connection.extra_dejson.get('protocol', 'binary')
        }

        # 사용자가 제공한 값으로 override
        for param, value in {
            'host': self.host,
            'port': self.port,
            'timeout': self.timeout,
            'autoconnect': self.autoconnect,
            'table_prefix': self.table_prefix,
            'table_prefix_separator': self.table_prefix_separator,
            'compat': self.compat,
            'transport': self.transport,
            'protocol': self.protocol
        }.items():
            if value is not None:
                conn_params[param] = value

        # 추가 kwargs 업데이트
        conn_params.update(self.kwargs)

        return happybase.Connection(**conn_params)

 

  • happybase.Connection 객체를 반환한다.
  • connection 객체에서 연결 정보를 가져온다. get 메서드를 사용해 가져오고, 없으면 기본값을 설정한다.
  • 사용자가 명시적으로 설정한 값으로 connection 정보를 업데이트한다.
  • 매개변수의 우선순위는 사용자가 직접 제공한 값 > Airflow connection 설정 > 기본값 순으로 정의한다.

custom hook을 사용한 DAG 개발하기

1. custom hook 클래스 코드 저장

`dags/include/custom_hooks/hbase_hook.py`에 코드를 저장한다.

코드 저장 위치는 DAG에서 호출할 수 있는 위치면 어디든 상관없다.

 

2. connection 생성

airflow ui에서 generic type으로 connection을 생성한다.

 

3. DAG 개발

custom hook class의 위치에 따라 임포트가 변경된다. 이 예제 케이스에서는 아래와 같이 custom hook을 임포트 한다.

`from include.custom_hooks.hbase_hook import HBaseHook`

import pendulum
from airflow.decorators import dag, task
from airflow.utils.edgemodifier import Label
from include.custom_hooks.hbase_hook import HBaseHook

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "hbase", "custom_hook"],
)
def hbase_dag():

    @task
    def hbase_task():
        hook = HBaseHook(hbase_conn_id="hbase")
        conn = hook.get_conn()
        print(conn.tables())
    
    hbase_task()

hbase_dag()

 

로그를 확인해 hbase와 정상적으로 연결되었는 지 확인한다.

airflow.datalake.net
*** Found local files:
***   * /opt/airflow/logs/dag_id=hbase_dag/run_id=scheduled__2024-08-09T18:00:00+00:00/task_id=hbase_task/attempt=1.log
[2024-09-12, 09:26:29 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-09-12, 09:26:30 UTC] {base.py:84} INFO - Retrieving connection 'hbase'
[2024-09-12, 09:26:30 UTC] {logging_mixin.py:190} INFO - [b'ATLAS_ENTITY_AUDIT_EVENTS', b'atlas_janus', b'fota', b'fota_split', b'fota_split1', b'fota_split2', b'so_info1', b't1_11', b't1_15', b't1_16', b't1_2m']
[2024-09-12, 09:26:30 UTC] {python.py:240} INFO - Done. Returned value was: None
[2024-09-12, 09:26:30 UTC] {taskinstance.py:340} ▶ Post task execution logs

댓글