airflow에서 hbase custom sensor를 개발하는 방법을 소개한다. 이 예제에서는 HBaseRowExistsSensor를 개발하여 특정 row key가 HBase 테이블에 존재하는지 확인한다. 아래 링크에서 개발한 hbase hook을 사용했다.
airflow custom hook 가이드 : airflow custom hook 개발 가이드: hbase hook 개발하기
1. Custom Sensor의 구조
custom sensor는 일반적으로 다음과 같은 구조를 가진다.
- airflow의 BaseSensorOperator를 상속받는다.
- __init__ 메서드를 정의해 필요한 매개변수를 설정한다.
- poke 메서드를 구현해 실제 센싱 작업을 수행한다. mode에 poke와 reschedule가 있는 데 poke만 정의하는 이유는 reschedule도 내부적으로는 poke를 실행하기 때문이다.
2. 전체 코드
`dags/include/custom_sensors/hbase.py`에 코드를 작성한다.
HBaseRowExistsSensor는 HBase 테이블에서 특정 row key가 존재하는지 확인하는 역할을 수행한다.
rowkeys에 rowkey 리스트를 입력하거나, row_start, row_end로 범위를 지정할 수 있다.
from typing import Optional, Sequence
from airflow.sensors.base import BaseSensorOperator
from include.custom_hooks.hbase_hook import HBaseHook
class HBaseRowkeySensor(BaseSensorOperator):
def __init__(
self,
*,
table: str,
rowkeys: Optional[Sequence[str]] = None,
row_start: Optional[str] = None,
row_stop: Optional[str] = None,
hbase_conn_id: str = 'hbase_default',
**kwargs
):
super().__init__(**kwargs)
self.table = table
self.rowkeys = rowkeys
self.row_start = row_start
self.row_stop = row_stop
self.hbase_conn_id = hbase_conn_id
if (rowkeys and (row_start or row_stop)) or (not rowkeys and not (row_start and row_stop)):
raise ValueError("must use only rowkeys, or only row_start and row_end.")
def poke(self, context):
hook = HBaseHook(hbase_conn_id=self.hbase_conn_id)
conn = hook.get_conn()
table = conn.table(self.table)
if self.rowkeys:
return self._check_rowkeys(table)
else:
return self._check_rowkey_range(table)
def _check_rowkeys(self, table):
for rowkey in self.rowkeys:
if not table.row(rowkey): # 하나라도 존재하지 않으면 False
self.log.info(f"rowkey {rowkey} does not exist.")
return False
self.log.info("all rowkeys exist.")
return True
def _check_rowkey_range(self, table):
scanner = table.scan(row_start=self.row_start, row_stop=self.row_stop, limit=1) # 조건 내에 데이터가 하나라도 있으면 True
for key, data in scanner:
self.log.info("rowkey exists in range.")
return True
self.log.info("there is no rowkey in range.")
return False
3. 코드 분석
1. __init__ 메서드
- rowkeys: 확인할 rowkey 리스트
- row_start: row key 범위의 시작점
- row_stop: row key 범위의 끝점
- rowkeys와 row 범위를 동시에 사용하면 ValueError가 발생하도록 구현
2. poke 메서드
- rowkeys 목록이 제공되면 _check_rowkeys 메서드 호출
- row 범위가 제공되면 _check_rowkey_range 메서드 호출
3. _check_rowkeys 메서드
- 제공된 모든 row key가 존재하는지 확인
- 하나라도 없으면 False 반환
4. _check_rowkey_range 메서드
- 지정된 범위 내에 row가 하나라도 존재하는지 확인
- 존재하면 True, 없으면 False 반환
4. Custom Sensor 사용 예시
Hbase 테이블에는 rowkey001부터 rowkey010 까지의 데이터만 존재한다.sensor1은 테이블에 존재하는 rowkeys를 조회하기에 성공하고, sensor2는 존재하지 않는 rowkey 범위를 조회하기에 실패한다.
import pendulum
from airflow.decorators import dag, task
from include.custom_sensors.hbase import HBaseRowkeySensor
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "hbase_sensor"],
)
def hbase_sensor():
sensor1 = HBaseRowkeySensor(
task_id="sensor1",
hbase_conn_id="hbase",
table="airflow",
rowkeys=["row001", "row009"],
poke_interval=10,
timeout=30
)
sensor2 = HBaseRowkeySensor(
task_id="sensor2",
hbase_conn_id="hbase",
table="airflow",
row_start="row011",
row_stop="row013",
mode="reschedule",
poke_interval=10,
timeout=30,
soft_fail=True
)
sensor1 >> sensor2
hbase_sensor()
sensor2의 로그 확인
[2024-09-13, 16:05:55 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-09-13, 16:05:56 UTC] {base.py:84} INFO - Retrieving connection 'hbase'
[2024-09-13, 16:05:56 UTC] {hbase.py:50} INFO - there is no rowkey in range.
[2024-09-13, 16:05:56 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-09-13, 16:05:56 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2024-09-13, 16:06:07 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-09-13, 16:06:07 UTC] {base.py:84} INFO - Retrieving connection 'hbase'
[2024-09-13, 16:06:07 UTC] {hbase.py:50} INFO - there is no rowkey in range.
[2024-09-13, 16:06:07 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-09-13, 16:06:07 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2024-09-13, 16:06:18 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-09-13, 16:06:18 UTC] {base.py:84} INFO - Retrieving connection 'hbase'
[2024-09-13, 16:06:18 UTC] {hbase.py:50} INFO - there is no rowkey in range.
[2024-09-13, 16:06:18 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-09-13, 16:06:18 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2024-09-13, 16:06:29 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-09-13, 16:06:29 UTC] {base.py:84} INFO - Retrieving connection 'hbase'
[2024-09-13, 16:06:29 UTC] {hbase.py:50} INFO - there is no rowkey in range.
[2024-09-13, 16:06:29 UTC] {taskinstance.py:301} INFO - Sensor has timed out; run duration of 33.564521 seconds exceeds the specified timeout of 30.0.
'aiflow' 카테고리의 다른 글
S3에 XCom 저장하기: XComObjectStorageBackend 설정 가이드 (1) | 2024.09.16 |
---|---|
Airflow Custom Connection Type 생성 및 Provider 빌드 가이드: hbase provider 개발하기 (1) | 2024.09.15 |
airflow custom operator 개발 가이드: hbase operator 개발하기 (1) | 2024.09.12 |
airflow custom hook 개발 가이드: hbase hook 개발하기 (0) | 2024.09.12 |
airflow DAG 시각화 및 구조화: Log Grouping, Edge Labels, Task Groups 활용하기 (0) | 2024.09.04 |
댓글