task context는 task 실행과 관련된 메타 데이터를 포함하는 객체로 딕셔너리 형태로 제공된다.
jinja template이나 python의 get_current_context를 사용해 task context 값을 참조할 수 있다.
PythonOperator 또는 TaskFlow API의 @task 데코레이터를 사용하는 Python 함수 내부에서 Jinja 템플릿을 직접 사용할 수 없다.
따라서 이 경우에는 PythonOperator의 callable 함수에 인자로 제공하거나, kwargs 사용 혹은 get_current_context 함수를 사용 해야한다.
1. jinja template
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
tz = pendulum.timezone("Asia/Seoul")
dag = DAG(
dag_id="context",
schedule="@once",
start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
tags=["kyeongseo.oh", "context"],
catchup=False
)
# execution_date 출력 (UTC 기준으로 출력)
bash1 = BashOperator(bash_command="echo '{{execution_date}}'", task_id="task1", dag=dag)
# execution_date를 yyyy-MM-dd 형태로 출력 (UTC 기준으로 출력)
bash2 = BashOperator(bash_command="echo '{{execution_date | ds}}'", task_id="task2", dag=dag)
# execution_date을 Asia/Seoul timezone으로 출력
bash3 = BashOperator(bash_command="echo '{{execution_date.in_timezone('Asia/Seoul')}}'", task_id="task3", dag=dag)
bash1 >> bash2 >> bash3
각 task에서 아래와 같은 결과가 출력된다.
bash1 : 2024-08-09 09:00:00+00:00
bash2 : 2024-08-09
bash3 : 2024-08-09 18:00:00+09:00
2. kwargs
python 함수가 kwargs를 입력 받도록 설계되어 있다면 함수가 실행될 때 task context는 딕셔너리 형태로 함수에 전달된다.
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
tz = pendulum.timezone("Asia/Seoul")
dag = DAG(
dag_id="context",
schedule="@once",
start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
tags=["kyeongseo.oh", "context"],
catchup=False
)
# kwargs는 다른 문자로 변경해서 사용해도 무관함
# *args, **kwargs에서 *는 인자를 tuple로 받는다는 의미이고, **는 인자를 딕셔너리로 받겠다는 의미임
def print_context(**kwargs):
print(kwargs["execution_date"])
task1 = PythonOperator(python_callable=print_context, task_id="task1", dag=dag)
task1
task의 수행 결과는 아래와 같다.
2024-08-09 09:00:00+00:00
3. callable 함수에 인자 제공
op_args 혹은 op_kwargs 파라미터를 사용해 callable 함수에 argument를 넘길 수 있다.
op_args는 list를 op_kwargs는 dictionary를 전달받는다.
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
tz = pendulum.timezone("Asia/Seoul")
dag = DAG(
dag_id="context",
schedule="@once",
start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
tags=["kyeongseo.oh", "context"],
catchup=False
)
# kwargs는 다른 문자로 변경해서 사용해도 무관함
# *args, **kwargs에서 *는 인자를 tuple로 받는다는 의미이고, **는 인자를 딕셔너리로 받겠다는 의미임
def print_context(context_execution):
print(context_execution)
# print_context("2024-08-09 09:00:00+00:00")를 실행한 것과 동일
task1 = PythonOperator(python_callable=print_context, op_args=["{{execution_date}}"] ,task_id="task1", dag=dag)
# print_context(context_execution="2024-08-09 09:00:00+00:00")를 실행한 것과 동일
task2 = PythonOperator(python_callable=print_context, op_kwargs={ "context_execution" : "{{execution_date}}" } ,task_id="task2", dag=dag)
task1 >> task2
Detail 탭에서 아래와 같이 전달된 argument를 확인할 수 있다.
4. get_current_context 방식
import pendulum
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
tz = pendulum.timezone("Asia/Seoul")
def local_tz(date):
return date.in_timezone('Asia/Seoul').strftime("%Y-%m-%d %H:%M:%S")
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
tags=["kyeongseo.oh", "context"],
catchup=False
)
def print_task_context():
@task()
def print_context():
context = get_current_context()
print(f"execution_date : {context['execution_date']}")
print(f"Asia/Seoul execution_date : {local_tz(context['execution_date'])}")
print_context()
print_task_context()
task의 수행 결과는 아래와 같다.
execution_date : 2024-08-09 09:00:00+00:00
Asia/Seoul execution_date : 2024-08-09 18:00:00
모든 task context를 출력하는 방법
import pendulum
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
tz = pendulum.timezone("Asia/Seoul")
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 9, 18, tz=tz),
tags=["kyeongseo.oh", "context"],
catchup=False
)
def print_task_context():
@task()
def print_context():
context = get_current_context()
for k, v in context.items():
print(f"{k} : {v}")
print_context()
print_task_context()
실행 결과는 아래와 같다. 각 context에 대한 자세한 내용은 airflow 공식 docs를 확인하자.
conf : <airflow.configuration.AirflowConfigParser object at 0x7f877d9ebfa0>
dag : <DAG: print_task_context>
dag_run : <DagRun print_task_context @ 2024-08-09 09:00:00+00:00: scheduled__2024-08-09T09:00:00+00:00, state:running, queued_at: 2024-08-10 20:22:31.328616+00:00. externally triggered: False>
data_interval_end : 2024-08-09 09:00:00+00:00
data_interval_start : 2024-08-09 09:00:00+00:00
ds : 2024-08-09
ds_nodash : 20240809
execution_date : 2024-08-09 09:00:00+00:00
expanded_ti_count : None
inlets : []
logical_date : 2024-08-09 09:00:00+00:00
macros : <module 'airflow.macros' from '/opt/airflow/.local/lib/python3.8/site-packages/airflow/macros/__init__.py'>
map_index_template : None
next_ds : None
next_ds_nodash : None
next_execution_date : None
outlets : []
params : {}
prev_data_interval_start_success : None
prev_data_interval_end_success : None
prev_ds : None
prev_ds_nodash : None
prev_execution_date : None
prev_execution_date_success : None
prev_start_date_success : None
prev_end_date_success : None
run_id : scheduled__2024-08-09T09:00:00+00:00
task : <Task(_PythonDecoratedOperator): print_context>
task_instance : <TaskInstance: print_task_context.print_context scheduled__2024-08-09T09:00:00+00:00 [running]>
task_instance_key_str : print_task_context__print_context__20240809
test_mode : False
ti : <TaskInstance: print_task_context.print_context scheduled__2024-08-09T09:00:00+00:00 [running]>
tomorrow_ds : 2024-08-10
tomorrow_ds_nodash : 20240810
triggering_dataset_events : defaultdict(<class 'list'>, {})
ts : 2024-08-09T09:00:00+00:00
ts_nodash : 20240809T090000
ts_nodash_with_tz : 20240809T090000+0000
var : {'json': None, 'value': None}
conn : None
yesterday_ds : 2024-08-08
yesterday_ds_nodash : 20240808
templates_dict : None
'aiflow' 카테고리의 다른 글
airflow task 의존성 (0) | 2024.08.11 |
---|---|
airflow connection과 hook (0) | 2024.08.10 |
airflow catchup & backfill (0) | 2024.08.10 |
airflow 실행 날짜 이해: DAG 실행 시간과 execution_date 비교 (0) | 2024.08.09 |
airflow 스케줄링 : preset, cron, 빈도 기반 (0) | 2024.08.09 |
댓글