본문 바로가기
aiflow

airflow custom operator 개발 가이드: hbase operator 개발하기

by kyeongseo.oh 2024. 9. 12.

airflow에서 hbase custom operator를 개발하는 방법을 소개한다. 이 예제에서는 HBaseBatchPutOperator와 HBaseScanOperator 두 개의 custom operator를 개발하였고, 아래 링크에서 개발한 hbase hook을 사용했다.

 

airflow custom hook 가이드 : airflow custom hook 개발 가이드: hbase hook 개발하기

 

1. Custom Operator의 구조

custom operator는 일반적으로 다음과 같은 구조를 가진다.

  • airflow의 BaseOperator를 상속받는다.
  • __init__ 메서드를 정의해 필요한 매개변수를 설정한다.
  • execute 메서드를 구현해 실제 작업을 수행한다.

2. 전체 코드

`dags/include/custom_operators/hbase.py`에 코드를 작성한다.

 

HBaseBatchPutOperator는 HBase 테이블에 대량의 데이터를 일괄적으로 삽입하는 역할을 수행하고,

HBaseScanOperator는 HBase 테이블의 데이터를 스캔하고 결과를 반환하는 역할을 수행한다.

from airflow.models.baseoperator import BaseOperator
from typing import List, Tuple
from include.custom_hooks.hbase_hook import HBaseHook

class HBaseBatchPutOperator(BaseOperator):
    def __init__(
        self, 
        table_name: str, 
        data: List[Tuple[str, dict]],
        batch_size: int = 1000,
        hbase_conn_id: str = "hbase_default",
        **kwargs
        ):
        super().__init__(**kwargs)
        self.table_name = table_name
        self.data = data
        self.batch_size = batch_size
        self.hbase_conn_id = hbase_conn_id

    def execute(self, context):
        hook = HBaseHook(hbase_conn_id=self.hbase_conn_id)
        conn = hook.get_conn()
        table = conn.table(self.table_name)

        with table.batch(batch_size=self.batch_size) as batch:
            for row_key, data in self.data:
                batch.put(row_key, data)


class HBaseScanOperator(BaseOperator):
    def __init__(
        self, 
        table_name: str,
        hbase_conn_id: str = "hbase_default",
        scan_kwargs: dict = {},
        **kwargs
        ):
        super().__init__(**kwargs)
        self.table_name = table_name
        self.hbase_conn_id = hbase_conn_id
        self.scan_kwargs = scan_kwargs

    def execute(self, context):
        hook = HBaseHook(hbase_conn_id=self.hbase_conn_id)
        conn = hook.get_conn()
        table = conn.table(self.table_name)
        scanned_data = list(table.scan(**self.scan_kwargs))

        for data in scanned_data:
            print(data)

        return [{"row_key": key.decode(), "data": {k.decode(): v.decode() for k, v in data.items()}} for key, data in scanned_data]

 

3. HBaseBatchPutOperator 코드 분석

  • BaseOperator를 상속받는다.
  • hbase에 적재할 데이터는 list 형태로 입력받으며, 리스트의 각 요소는 tuple 형태여야 한다.
  • HBaseHook을 사용하여 HBase 연결을 설정한다.
  • 배치 모드로 데이터를 삽입한다.

4. HBaseScanOperator 코드 분석

  • BaseOperator를 상속받는다.
  • HBaseHook을 사용하여 HBase 연결을 설정한다.
  • scan_kwargs를 사용해 scan 조건을 입력받는다. 아무런 조건을 입력하지 않으면 모든 데이터를 스캔한다.
  • 스캔된 각 행의 데이터를 로그에 출력한다.
  • 스캔 결과를 Python 딕셔너리 형태로 변환해 xcom에 반환한다. execute 함수에서 return하는 값은 xcom에 push된다.

5. DAG 개발

아래 DAG는 HBase 데이터베이스에 데이터를 삽입하고 스캔하는 두 가지 작업을 수행한다.

HBaseBatchPutOperator는 airflow 테이블에 10개의 row 데이터를 삽입하고,

HBaseScanOperator는 'row001'부터 'row004' 사이의 데이터를 스캔한 후 출력한다.

import pendulum
from airflow.decorators import dag, task
from include.custom_operators.hbase import HBaseBatchPutOperator, HBaseScanOperator

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "hbase", "custom_operator"],
)
def hbase_dag():

    data = [
        ('row001', {'cf1:column1': 'value1', 'cf1:column2': 'value2'}),
        ('row002', {'cf1:column1': 'value3', 'cf1:column2': 'value4'}),
        ('row003', {'cf1:column1': 'value5', 'cf1:column2': 'value6'}),
        ('row004', {'cf1:column1': 'value7', 'cf1:column2': 'value8'}),
        ('row005', {'cf1:column1': 'value9', 'cf1:column2': 'value10'}),
        ('row006', {'cf1:column1': 'value11', 'cf1:column2': 'value12'}),
        ('row007', {'cf1:column1': 'value13', 'cf1:column2': 'value14'}),
        ('row008', {'cf1:column1': 'value15', 'cf1:column2': 'value16'}),
        ('row009', {'cf1:column1': 'value17', 'cf1:column2': 'value18'}),
        ('row010', {'cf1:column1': 'value19', 'cf1:column2': 'value20'}),
    ]

    insert = HBaseBatchPutOperator(task_id="batch_insert", hbase_conn_id="hbase", table_name="airflow", batch_size=5, data=data)

    scan = HBaseScanOperator(task_id="scan_data", 
                            hbase_conn_id="hbase", 
                            table_name="airflow", 
                            scan_kwargs={
                                'row_start': 'row001',
                                'row_stop': 'row004',
                                }
                            )

    insert >> scan   

hbase_dag()

 

scan_data 로그

로그를 통해  'row001'부터 'row004' 사이의 데이터만 조회된 것을 확인한다.

[2024-09-12, 13:55:26 UTC] {logging_mixin.py:190} INFO - (b'row001', {b'cf1:column1': b'value1', b'cf1:column2': b'value2'})
[2024-09-12, 13:55:26 UTC] {logging_mixin.py:190} INFO - (b'row002', {b'cf1:column1': b'value3', b'cf1:column2': b'value4'})
[2024-09-12, 13:55:26 UTC] {logging_mixin.py:190} INFO - (b'row003', {b'cf1:column1': b'value5', b'cf1:column2': b'value6'})

 

scan_data XCom

scan 결과가 XCom에 push된 것을 확인한다.

댓글