본문 바로가기
aiflow

airflow connection과 hook

by kyeongseo.oh 2024. 8. 10.

connection은 airflow가 외부서비스과 통신하기 위한 url, id, password 등의 정보를 저장하는 기능이다.
사용자가 hook에 connection 정보를 입력하면, hook은 외부 시스템과의 연결을 생성한다.

 

connection 생성

connection을 생성하기 위해서는 별도의 provider를 설치해야한다. apache-airflow-providers-postgres를 설치한다.

provider를 설치한 후 airflow를 재시작하면 airflow ui에서 postgres 연결 정보를 가진 connection을 추가할 수 있다.

pip install apache-airflow-providers-postgres

 

connection type을 postgres로 한 후 연결 정보를 입력한다.

 

 

 

1.BaseHook 사용

postgresql과 통신하기 위해 psycopg2-binary를 설치한다.

pip install psycopg2-binary

 

그 후에 psycopg2에 BaseHook을 사용해 가져온 connection 정보를 입력해 postgresql과 연결한다.

import pendulum
from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
import psycopg2

tz = pendulum.timezone("Asia/Seoul")

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
    tags=["kyeongseo.oh", "hook"], 
    catchup=False
    )
def postgres_hook():

    @task()
    def query_table():
        postgres_hook = BaseHook.get_connection("dd-postgres")
        conn = psycopg2.connect(host=postgres_hook.host, dbname=postgres_hook.schema ,user=postgres_hook.login ,password=postgres_hook.password ,port=postgres_hook.port)

        sql = "SELECT * FROM customer LIMIT 5;"
        
        cursor = conn.cursor()
        cursor.execute(sql) 
        results = cursor.fetchall()
        
        for row in results:
            print(row)

    query_table()

postgres_hook()

 

 

2. PostgresHook 사용

BaseHook과 다르게 connection에서 value를 추출할 필요가 없다.

import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook

tz = pendulum.timezone("Asia/Seoul")

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
    tags=["kyeongseo.oh", "hook"], 
    catchup=False
    )
def postgres_hook():

    @task()
    def query_table():
        postgres_hook = PostgresHook(postgres_conn_id="dd-postgres")
        conn = postgres_hook.get_conn()

        sql = "SELECT * FROM customer LIMIT 5;"
        
        cursor = conn.cursor()
        cursor.execute(sql) 
        results = cursor.fetchall()
        
        for row in results:
            print(row)

    query_table()

postgres_hook()

 

 

3. PostgresOperator 사용

operator에 connection id를 입력해 postgresql과 연결한다.

PostgresOperator는 DML 및 DDL 문에는 좋지만 DQL 문에는 그다지 유용하지 않다.

또한 PostgresOperator는 Deprecated 되어 SQLExecuteQueryOperator을 사용하는 것이 권장된다.

import pendulum
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python_operator import PythonOperator

tz = pendulum.timezone("Asia/Seoul")

dag = DAG(
    dag_id="postgres_operator",
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
    tags=["kyeongseo.oh", "hook"], 
    catchup=False
    )

def print_results(**context):
    ti = context['ti']
    results = ti.xcom_pull(task_ids='query_postgresql')
    for row in results:
        print(row)


query_postgres = PostgresOperator(postgres_conn_id="dd-postgres", 
                        sql="SELECT * FROM customer LIMIT 5;", 
                        task_id="query_postgresql",
                        show_return_value_in_logs=True
                    )

print_results_task = PythonOperator(
    task_id='print_results',
    python_callable=print_results,
    dag=dag
)

query_postgres >> print_results_task

 

4. SQLExecuteQueryOperator 사용

SQLExecuteQueryOperator는 다양한 SQL 데이터베이스에 대해 쿼리를 실행할 수 있는 범용 연산자로 사용 방법은 PostgresOperator와 동일하다.

import pendulum
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.python_operator import PythonOperator

tz = pendulum.timezone("Asia/Seoul")

dag = DAG(
    dag_id="postgres_operator",
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
    tags=["kyeongseo.oh", "hook"], 
    catchup=False
    )

def print_results(**context):
    ti = context['ti']
    results = ti.xcom_pull(task_ids='query_postgresql')
    for row in results:
        print(row)

query_postgres = SQLExecuteQueryOperator(conn_id="dd-postgres",
                        sql="SELECT * FROM customer LIMIT 5;", 
                        task_id="query_postgresql",
                        show_return_value_in_logs=True)

print_results_task = PythonOperator(
    task_id='print_results',
    python_callable=print_results,
    dag=dag
)

query_postgres >> print_results_task

댓글