impala, s3 등의 외부 시스템을 이용하는 방법을 설명한다.
1. Impala 연동하기
- impala provider 설치
Impala connection을 생성하기 위해 provider를 설치합니다.
apache-airflow-providers-apache-impala
- connection 생성하기
host에는 impala coordinator의 주소를 입력합니다. coordinator가 없다면 impala daemon 중 하나의 주소를 입력합니다.
- Impala와 연동하기
Impala provider는 operator를 제공하지 않고, hook만 제공하기에, hook을 사용해 Impala에 쿼리를 실행해야 합니다.
import pendulum
from airflow.decorators import dag, task
from airflow.providers.apache.impala.hooks.impala import ImpalaHook
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "impala"],
)
def query_impala():
@task
def query_impala():
hook = ImpalaHook(impala_conn_id='dd-impala')
conn = hook.get_conn()
sql = "SELECT * FROM sensor_data;"
username = hook.get_connection(hook.impala_conn_id).login
cursor = conn.cursor(user=username)
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
print(row)
query_impala()
query_impala()
- 로그 확인하기
[2024-08-25, 07:25:46 UTC] {base.py:84} INFO - Retrieving connection 'dd-impala'
[2024-08-25, 07:25:46 UTC] {hiveserver2.py:142} INFO - Using database default as default
[2024-08-25, 07:25:46 UTC] {logging_mixin.py:190} INFO - (25,)
[2024-08-25, 07:25:46 UTC] {logging_mixin.py:190} INFO - (30,)
[2024-08-25, 07:25:46 UTC] {hiveserver2.py:303} INFO - Closing active operation
[2024-08-25, 07:25:46 UTC] {python.py:240} INFO - Done. Returned value was: None
2. s3 연동하기
- amazon provider 설치
s3 connection을 생성하기 위해 provider를 설치합니다.
apache-airflow-providers-amazon
- connection 생성하기
endopint_url, region_name 등의 정보를 추가해야 하는 경우에는 Extra에 json 포맷으로 작성합니다.
- s3와 연동하기
object key 리스트를 추출하는 예제 DAG 입니다.
import pendulum
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "s3hook"],
)
def connect_s3():
@task
def connect_s3():
hook = S3Hook('kyeongseo-s3')
keys = hook.list_keys(bucket_name="kyeongseo", prefix='test')
print("First 10 keys:")
for key in keys[:10]:
print(key)
connect_s3()
connect_s3()
- 로그 확인하기
[2024-08-25, 08:11:30 UTC] {connection_wrapper.py:388} INFO - AWS Connection (conn_id='kyeongseo-s3', conn_type='aws') credentials retrieved from login and password.
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - First 10 keys:
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test.py
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test/
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test/README.md
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test/test.txt
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test/test2.txt
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test_configure.py
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test_cp.py
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test_ls.py
[2024-08-25, 08:11:31 UTC] {logging_mixin.py:190} INFO - test_mb.py
[2024-08-25, 08:11:31 UTC] {python.py:240} INFO - Done. Returned value was: None
'aiflow' 카테고리의 다른 글
airflow Data-aware scheduling과 Dataset (0) | 2024.08.17 |
---|---|
airflow Params (0) | 2024.08.17 |
airflow 외부 시스템 이용하기 -1 : DockerOperator, KubernetesPodOperator, SparkKubernetesOperator(SKO) (0) | 2024.08.17 |
airflow ExternalTaskSensor와 TriggerDagRunOperator (0) | 2024.08.17 |
airflow sensor: poke와 reschedule (0) | 2024.08.13 |
댓글