본문 바로가기
aiflow

airflow 외부 시스템 이용하기 -2: ImpalaHook, S3Hook

by kyeongseo.oh 2024. 8. 17.

impala, s3 등의 외부 시스템을 이용하는 방법을 설명한다.


1. Impala 연동하기

  • impala provider 설치

Impala connection을 생성하기 위해 provider를 설치합니다.

apache-airflow-providers-apache-impala

 

  • connection 생성하기

host에는 impala coordinator의 주소를 입력합니다. coordinator가 없다면 impala daemon 중 하나의 주소를 입력합니다.

 

  • Impala와 연동하기

Impala provider는 operator를 제공하지 않고, hook만 제공하기에, hook을 사용해 Impala에 쿼리를 실행해야 합니다.

import pendulum
from airflow.decorators import dag, task
from airflow.providers.apache.impala.hooks.impala import ImpalaHook

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "impala"],
)
def query_impala():
    
    @task
    def query_impala():
        hook = ImpalaHook(impala_conn_id='dd-impala')
        conn = hook.get_conn()

        sql = "SELECT * FROM sensor_data;"

        username = hook.get_connection(hook.impala_conn_id).login

        cursor = conn.cursor(user=username)
        cursor.execute(sql) 
        results = cursor.fetchall()
        
        for row in results:
            print(row)
    
    query_impala()

query_impala()

 

  • 로그 확인하기
[2024-08-25, 07:25:46 UTC] {base.py:84} INFO - Retrieving connection 'dd-impala'
[2024-08-25, 07:25:46 UTC] {hiveserver2.py:142} INFO - Using database default as default
[2024-08-25, 07:25:46 UTC] {logging_mixin.py:190} INFO - (25,)
[2024-08-25, 07:25:46 UTC] {logging_mixin.py:190} INFO - (30,)
[2024-08-25, 07:25:46 UTC] {hiveserver2.py:303} INFO - Closing active operation
[2024-08-25, 07:25:46 UTC] {python.py:240} INFO - Done. Returned value was: None

2. s3 연동하기

  • amazon provider 설치

s3 connection을 생성하기 위해 provider를 설치합니다.

apache-airflow-providers-amazon

 

  • connection 생성하기

endopint_url, region_name 등의 정보를 추가해야 하는 경우에는 Extra에 json 포맷으로 작성합니다.

 

  • s3와 연동하기

object key 리스트를 추출하는 예제 DAG 입니다.

import pendulum
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook 

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "s3hook"],
)
def connect_s3():
    
    @task
    def connect_s3():
        hook = S3Hook('kyeongseo-s3')

        keys = hook.list_keys(bucket_name="kyeongseo", prefix='test')

        print("First 10 keys:")
        for key in keys[:10]:
            print(key)

    connect_s3()

connect_s3()

 

  • 로그 확인하기
[2024-08-25, 08:11:30 UTC] {connection_wrapper.py:388} INFO - AWS Connection (conn_id='kyeongseo-s3', conn_type='aws') credentials retrieved from login and password.
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - First 10 keys:
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test.py
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test/
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test/README.md
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test/test.txt
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test/test2.txt
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test_configure.py
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test_cp.py
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test_ls.py
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test_mb.py
[2024-08-25, 08:11:31 UTC] {python.py:240} INFO - Done. Returned value was: None

댓글