airflow에서 hbase custom operator를 개발하는 방법을 소개한다. 이 예제에서는 HBaseBatchPutOperator와 HBaseScanOperator 두 개의 custom operator를 개발하였고, 아래 링크에서 개발한 hbase hook을 사용했다.
airflow custom hook 가이드 : airflow custom hook 개발 가이드: hbase hook 개발하기
1. Custom Operator의 구조
custom operator는 일반적으로 다음과 같은 구조를 가진다.
- airflow의 BaseOperator를 상속받는다.
- __init__ 메서드를 정의해 필요한 매개변수를 설정한다.
- execute 메서드를 구현해 실제 작업을 수행한다.
2. 전체 코드
`dags/include/custom_operators/hbase.py`에 코드를 작성한다.
HBaseBatchPutOperator는 HBase 테이블에 대량의 데이터를 일괄적으로 삽입하는 역할을 수행하고,
HBaseScanOperator는 HBase 테이블의 데이터를 스캔하고 결과를 반환하는 역할을 수행한다.
from airflow.models.baseoperator import BaseOperator
from typing import List, Tuple
from include.custom_hooks.hbase_hook import HBaseHook
class HBaseBatchPutOperator(BaseOperator):
def __init__(
self,
table_name: str,
data: List[Tuple[str, dict]],
batch_size: int = 1000,
hbase_conn_id: str = "hbase_default",
**kwargs
):
super().__init__(**kwargs)
self.table_name = table_name
self.data = data
self.batch_size = batch_size
self.hbase_conn_id = hbase_conn_id
def execute(self, context):
hook = HBaseHook(hbase_conn_id=self.hbase_conn_id)
conn = hook.get_conn()
table = conn.table(self.table_name)
with table.batch(batch_size=self.batch_size) as batch:
for row_key, data in self.data:
batch.put(row_key, data)
class HBaseScanOperator(BaseOperator):
def __init__(
self,
table_name: str,
hbase_conn_id: str = "hbase_default",
scan_kwargs: dict = {},
**kwargs
):
super().__init__(**kwargs)
self.table_name = table_name
self.hbase_conn_id = hbase_conn_id
self.scan_kwargs = scan_kwargs
def execute(self, context):
hook = HBaseHook(hbase_conn_id=self.hbase_conn_id)
conn = hook.get_conn()
table = conn.table(self.table_name)
scanned_data = list(table.scan(**self.scan_kwargs))
for data in scanned_data:
print(data)
return [{"row_key": key.decode(), "data": {k.decode(): v.decode() for k, v in data.items()}} for key, data in scanned_data]
3. HBaseBatchPutOperator 코드 분석
- BaseOperator를 상속받는다.
- hbase에 적재할 데이터는 list 형태로 입력받으며, 리스트의 각 요소는 tuple 형태여야 한다.
- HBaseHook을 사용하여 HBase 연결을 설정한다.
- 배치 모드로 데이터를 삽입한다.
4. HBaseScanOperator 코드 분석
- BaseOperator를 상속받는다.
- HBaseHook을 사용하여 HBase 연결을 설정한다.
- scan_kwargs를 사용해 scan 조건을 입력받는다. 아무런 조건을 입력하지 않으면 모든 데이터를 스캔한다.
- 스캔된 각 행의 데이터를 로그에 출력한다.
- 스캔 결과를 Python 딕셔너리 형태로 변환해 xcom에 반환한다. execute 함수에서 return하는 값은 xcom에 push된다.
5. DAG 개발
아래 DAG는 HBase 데이터베이스에 데이터를 삽입하고 스캔하는 두 가지 작업을 수행한다.
HBaseBatchPutOperator는 airflow 테이블에 10개의 row 데이터를 삽입하고,
HBaseScanOperator는 'row001'부터 'row004' 사이의 데이터를 스캔한 후 출력한다.
import pendulum
from airflow.decorators import dag, task
from include.custom_operators.hbase import HBaseBatchPutOperator, HBaseScanOperator
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "hbase", "custom_operator"],
)
def hbase_dag():
data = [
('row001', {'cf1:column1': 'value1', 'cf1:column2': 'value2'}),
('row002', {'cf1:column1': 'value3', 'cf1:column2': 'value4'}),
('row003', {'cf1:column1': 'value5', 'cf1:column2': 'value6'}),
('row004', {'cf1:column1': 'value7', 'cf1:column2': 'value8'}),
('row005', {'cf1:column1': 'value9', 'cf1:column2': 'value10'}),
('row006', {'cf1:column1': 'value11', 'cf1:column2': 'value12'}),
('row007', {'cf1:column1': 'value13', 'cf1:column2': 'value14'}),
('row008', {'cf1:column1': 'value15', 'cf1:column2': 'value16'}),
('row009', {'cf1:column1': 'value17', 'cf1:column2': 'value18'}),
('row010', {'cf1:column1': 'value19', 'cf1:column2': 'value20'}),
]
insert = HBaseBatchPutOperator(task_id="batch_insert", hbase_conn_id="hbase", table_name="airflow", batch_size=5, data=data)
scan = HBaseScanOperator(task_id="scan_data",
hbase_conn_id="hbase",
table_name="airflow",
scan_kwargs={
'row_start': 'row001',
'row_stop': 'row004',
}
)
insert >> scan
hbase_dag()
scan_data 로그
로그를 통해 'row001'부터 'row004' 사이의 데이터만 조회된 것을 확인한다.
[2024-09-12, 13:55:26 UTC] {logging_mixin.py:190} INFO - (b'row001', {b'cf1:column1': b'value1', b'cf1:column2': b'value2'})
[2024-09-12, 13:55:26 UTC] {logging_mixin.py:190} INFO - (b'row002', {b'cf1:column1': b'value3', b'cf1:column2': b'value4'})
[2024-09-12, 13:55:26 UTC] {logging_mixin.py:190} INFO - (b'row003', {b'cf1:column1': b'value5', b'cf1:column2': b'value6'})
scan_data XCom
scan 결과가 XCom에 push된 것을 확인한다.
'aiflow' 카테고리의 다른 글
Airflow Custom Connection Type 생성 및 Provider 빌드 가이드: hbase provider 개발하기 (1) | 2024.09.15 |
---|---|
airflow custom sensor 개발 가이드: hbase sensor 개발하기 (0) | 2024.09.14 |
airflow custom hook 개발 가이드: hbase hook 개발하기 (0) | 2024.09.12 |
airflow DAG 시각화 및 구조화: Log Grouping, Edge Labels, Task Groups 활용하기 (0) | 2024.09.04 |
airflow에서 remote spark cluster에 job submit 하기: SparkSubmitOperator (0) | 2024.09.03 |
댓글