본문 바로가기
aiflow

airflow task context: jinja template, get_current_context

by kyeongseo.oh 2024. 8. 10.

task context는 task 실행과 관련된 메타 데이터를 포함하는 객체로 딕셔너리 형태로 제공된다. 
jinja template이나 python의 get_current_context를 사용해 task context 값을 참조할 수 있다.

 

PythonOperator 또는 TaskFlow API의 @task 데코레이터를 사용하는 Python 함수 내부에서 Jinja 템플릿을 직접 사용할 수 없다.

따라서 이 경우에는 PythonOperator의 callable 함수에 인자로 제공하거나, kwargs 사용 혹은 get_current_context 함수를 사용 해야한다.

 

1. jinja template

import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator

tz = pendulum.timezone("Asia/Seoul")


dag = DAG(
    dag_id="context",
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
    tags=["kyeongseo.oh", "context"], 
    catchup=False
    )

# execution_date 출력 (UTC 기준으로 출력)
bash1 = BashOperator(bash_command="echo '{{execution_date}}'", task_id="task1", dag=dag)

# execution_date를 yyyy-MM-dd 형태로 출력 (UTC 기준으로 출력)
bash2 = BashOperator(bash_command="echo '{{execution_date | ds}}'", task_id="task2", dag=dag)

# execution_date을 Asia/Seoul timezone으로 출력
bash3 = BashOperator(bash_command="echo '{{execution_date.in_timezone('Asia/Seoul')}}'", task_id="task3", dag=dag)

bash1 >> bash2 >> bash3

 

각 task에서 아래와 같은 결과가 출력된다.

bash1 : 2024-08-09 09:00:00+00:00
bash2 : 2024-08-09
bash3 : 2024-08-09 18:00:00+09:00

 

2. kwargs 

python 함수가 kwargs를 입력 받도록 설계되어 있다면 함수가 실행될 때 task context는 딕셔너리 형태로 함수에 전달된다.

import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

tz = pendulum.timezone("Asia/Seoul")


dag = DAG(
    dag_id="context",
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
    tags=["kyeongseo.oh", "context"], 
    catchup=False
    )

# kwargs는 다른 문자로 변경해서 사용해도 무관함
# *args, **kwargs에서 *는 인자를 tuple로 받는다는 의미이고, **는 인자를 딕셔너리로 받겠다는 의미임
def print_context(**kwargs):
    print(kwargs["execution_date"])

task1 = PythonOperator(python_callable=print_context, task_id="task1", dag=dag)

task1

 

task의 수행 결과는 아래와 같다.

2024-08-09 09:00:00+00:00

 

3. callable 함수에 인자 제공

op_args 혹은 op_kwargs 파라미터를 사용해 callable 함수에 argument를 넘길 수 있다.

op_args는 list를 op_kwargs는 dictionary를 전달받는다.

import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

tz = pendulum.timezone("Asia/Seoul")


dag = DAG(
    dag_id="context",
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
    tags=["kyeongseo.oh", "context"], 
    catchup=False
    )

# kwargs는 다른 문자로 변경해서 사용해도 무관함
# *args, **kwargs에서 *는 인자를 tuple로 받는다는 의미이고, **는 인자를 딕셔너리로 받겠다는 의미임
def print_context(context_execution):
    print(context_execution)

# print_context("2024-08-09 09:00:00+00:00")를 실행한 것과 동일
task1 = PythonOperator(python_callable=print_context, op_args=["{{execution_date}}"] ,task_id="task1", dag=dag)

# print_context(context_execution="2024-08-09 09:00:00+00:00")를 실행한 것과 동일
task2 = PythonOperator(python_callable=print_context, op_kwargs={ "context_execution" : "{{execution_date}}" } ,task_id="task2", dag=dag)

task1 >> task2

 

Detail 탭에서 아래와 같이 전달된 argument를 확인할 수 있다.

 

task1

 

task2

 

4. get_current_context 방식

import pendulum
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

tz = pendulum.timezone("Asia/Seoul")

def local_tz(date):
    return date.in_timezone('Asia/Seoul').strftime("%Y-%m-%d %H:%M:%S")

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
    tags=["kyeongseo.oh", "context"], 
    catchup=False
    )

def print_task_context():

    @task()
    def print_context():
    
        context = get_current_context()
        
        print(f"execution_date : {context['execution_date']}")
        print(f"Asia/Seoul execution_date : {local_tz(context['execution_date'])}")

    print_context()

print_task_context()

 

task의 수행 결과는 아래와 같다.

execution_date : 2024-08-09 09:00:00+00:00
Asia/Seoul execution_date : 2024-08-09 18:00:00

 

모든 task context를 출력하는 방법

import pendulum
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

tz = pendulum.timezone("Asia/Seoul")

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
    tags=["kyeongseo.oh", "context"], 
    catchup=False
    )
def print_task_context():

    @task()
    def print_context():
        context = get_current_context()

        for k, v in context.items():
            print(f"{k} : {v}")
        
    print_context()

print_task_context()

 

실행 결과는 아래와 같다. 각 context에 대한 자세한 내용은 airflow 공식 docs를 확인하자.

conf : <airflow.configuration.AirflowConfigParser object at 0x7f877d9ebfa0>
dag : <DAG: print_task_context>
dag_run : <DagRun print_task_context @ 2024-08-09 09:00:00+00:00: scheduled__2024-08-09T09:00:00+00:00, state:running, queued_at: 2024-08-10 20:22:31.328616+00:00. externally triggered: False>
data_interval_end : 2024-08-09 09:00:00+00:00
data_interval_start : 2024-08-09 09:00:00+00:00
ds : 2024-08-09
ds_nodash : 20240809
execution_date : 2024-08-09 09:00:00+00:00
expanded_ti_count : None
inlets : []
logical_date : 2024-08-09 09:00:00+00:00
macros : <module 'airflow.macros' from '/opt/airflow/.local/lib/python3.8/site-packages/airflow/macros/__init__.py'>
map_index_template : None
next_ds : None
next_ds_nodash : None
next_execution_date : None
outlets : []
params : {}
prev_data_interval_start_success : None
prev_data_interval_end_success : None
prev_ds : None
prev_ds_nodash : None
prev_execution_date : None
prev_execution_date_success : None
prev_start_date_success : None
prev_end_date_success : None
run_id : scheduled__2024-08-09T09:00:00+00:00
task : <Task(_PythonDecoratedOperator): print_context>
task_instance : <TaskInstance: print_task_context.print_context scheduled__2024-08-09T09:00:00+00:00 [running]>
task_instance_key_str : print_task_context__print_context__20240809
test_mode : False
ti : <TaskInstance: print_task_context.print_context scheduled__2024-08-09T09:00:00+00:00 [running]>
tomorrow_ds : 2024-08-10
tomorrow_ds_nodash : 20240810
triggering_dataset_events : defaultdict(<class 'list'>, {})
ts : 2024-08-09T09:00:00+00:00
ts_nodash : 20240809T090000
ts_nodash_with_tz : 20240809T090000+0000
var : {'json': None, 'value': None}
conn : None
yesterday_ds : 2024-08-08
yesterday_ds_nodash : 20240808
templates_dict : None

댓글