본문 바로가기
aiflow

airflow custom sensor 개발 가이드: hbase sensor 개발하기

by kyeongseo.oh 2024. 9. 14.

airflow에서 hbase custom sensor를 개발하는 방법을 소개한다. 이 예제에서는 HBaseRowExistsSensor를 개발하여 특정 row key가 HBase 테이블에 존재하는지 확인한다. 아래 링크에서 개발한 hbase hook을 사용했다.

 

airflow custom hook 가이드 : airflow custom hook 개발 가이드: hbase hook 개발하기

 

1. Custom Sensor의 구조

custom sensor는 일반적으로 다음과 같은 구조를 가진다.

  • airflow의 BaseSensorOperator를 상속받는다.
  • __init__ 메서드를 정의해 필요한 매개변수를 설정한다.
  • poke 메서드를 구현해 실제 센싱 작업을 수행한다. mode에 poke와 reschedule가 있는 데 poke만 정의하는 이유는 reschedule도 내부적으로는 poke를 실행하기 때문이다.

2. 전체 코드

`dags/include/custom_sensors/hbase.py`에 코드를 작성한다.

 

HBaseRowExistsSensor는 HBase 테이블에서 특정 row key가 존재하는지 확인하는 역할을 수행한다.

rowkeys에 rowkey 리스트를 입력하거나, row_start, row_end로 범위를 지정할 수 있다. 

from typing import Optional, Sequence
from airflow.sensors.base import BaseSensorOperator
from include.custom_hooks.hbase_hook import HBaseHook

class HBaseRowkeySensor(BaseSensorOperator):
    def __init__(
        self,
        *,
        table: str,
        rowkeys: Optional[Sequence[str]] = None,
        row_start: Optional[str] = None,
        row_stop: Optional[str] = None,
        hbase_conn_id: str = 'hbase_default',
        **kwargs
    ):
        super().__init__(**kwargs)
        self.table = table
        self.rowkeys = rowkeys
        self.row_start = row_start
        self.row_stop = row_stop
        self.hbase_conn_id = hbase_conn_id

        if (rowkeys and (row_start or row_stop)) or (not rowkeys and not (row_start and row_stop)):
            raise ValueError("must use only rowkeys, or only row_start and row_end.")

    def poke(self, context):
        
        hook = HBaseHook(hbase_conn_id=self.hbase_conn_id)
        conn = hook.get_conn()
        table = conn.table(self.table)

        if self.rowkeys:
            return self._check_rowkeys(table)
        else:
            return self._check_rowkey_range(table)
        
    def _check_rowkeys(self, table):
        for rowkey in self.rowkeys:
            if not table.row(rowkey):  # 하나라도 존재하지 않으면 False
                self.log.info(f"rowkey {rowkey} does not exist.")
                return False
        self.log.info("all rowkeys exist.")
        return True

    def _check_rowkey_range(self, table):
        scanner = table.scan(row_start=self.row_start, row_stop=self.row_stop, limit=1) # 조건 내에 데이터가 하나라도 있으면 True
        for key, data in scanner:
            self.log.info("rowkey exists in range.")
            return True
        self.log.info("there is no rowkey in range.")
        return False

 

3. 코드 분석

1. __init__ 메서드

  • rowkeys: 확인할 rowkey 리스트
  • row_start: row key 범위의 시작점
  • row_stop: row key 범위의 끝점
  • rowkeys와 row 범위를 동시에 사용하면 ValueError가 발생하도록 구현

 

2. poke 메서드

  • rowkeys 목록이 제공되면 _check_rowkeys 메서드 호출
  • row 범위가 제공되면 _check_rowkey_range 메서드 호출

 

3. _check_rowkeys 메서드

  • 제공된 모든 row key가 존재하는지 확인
  • 하나라도 없으면 False 반환

 

4. _check_rowkey_range 메서드

  • 지정된 범위 내에 row가 하나라도 존재하는지 확인
  • 존재하면 True, 없으면 False 반환

 

4. Custom Sensor 사용 예시

Hbase 테이블에는 rowkey001부터 rowkey010 까지의 데이터만 존재한다.sensor1은 테이블에 존재하는 rowkeys를 조회하기에 성공하고, sensor2는 존재하지 않는 rowkey 범위를 조회하기에 실패한다.

import pendulum
from airflow.decorators import dag, task
from include.custom_sensors.hbase import HBaseRowkeySensor

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "hbase_sensor"],
)
def hbase_sensor():

    sensor1 = HBaseRowkeySensor(
        task_id="sensor1", 
        hbase_conn_id="hbase", 
        table="airflow", 
        rowkeys=["row001", "row009"],
        poke_interval=10,
        timeout=30
        )

    sensor2 = HBaseRowkeySensor(
        task_id="sensor2", 
        hbase_conn_id="hbase", 
        table="airflow", 
        row_start="row011",
        row_stop="row013", 
        mode="reschedule", 
        poke_interval=10, 
        timeout=30,
        soft_fail=True
        )

    sensor1 >> sensor2

hbase_sensor()

 

DAG graph

 

sensor2의 로그 확인

[2024-09-13, 16:05:55 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-09-13, 16:05:56 UTC] {base.py:84} INFO - Retrieving connection 'hbase'
[2024-09-13, 16:05:56 UTC] {hbase.py:50} INFO - there is no rowkey in range.
[2024-09-13, 16:05:56 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-09-13, 16:05:56 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2024-09-13, 16:06:07 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-09-13, 16:06:07 UTC] {base.py:84} INFO - Retrieving connection 'hbase'
[2024-09-13, 16:06:07 UTC] {hbase.py:50} INFO - there is no rowkey in range.
[2024-09-13, 16:06:07 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-09-13, 16:06:07 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2024-09-13, 16:06:18 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-09-13, 16:06:18 UTC] {base.py:84} INFO - Retrieving connection 'hbase'
[2024-09-13, 16:06:18 UTC] {hbase.py:50} INFO - there is no rowkey in range.
[2024-09-13, 16:06:18 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-09-13, 16:06:18 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2024-09-13, 16:06:29 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-09-13, 16:06:29 UTC] {base.py:84} INFO - Retrieving connection 'hbase'
[2024-09-13, 16:06:29 UTC] {hbase.py:50} INFO - there is no rowkey in range.
[2024-09-13, 16:06:29 UTC] {taskinstance.py:301} INFO - Sensor has timed out; run duration of 33.564521 seconds exceeds the specified timeout of 30.0.

댓글