aiflow25 airflow task context: jinja template, get_current_context task context는 task 실행과 관련된 메타 데이터를 포함하는 객체로 딕셔너리 형태로 제공된다. jinja template이나 python의 get_current_context를 사용해 task context 값을 참조할 수 있다. PythonOperator 또는 TaskFlow API의 @task 데코레이터를 사용하는 Python 함수 내부에서 Jinja 템플릿을 직접 사용할 수 없다.따라서 이 경우에는 PythonOperator의 callable 함수에 인자로 제공하거나, kwargs 사용 혹은 get_current_context 함수를 사용 해야한다. 1. jinja templateimport pendulumfrom airflow import DAGfrom airflow.operators.. 2024. 8. 10. airflow catchup & backfill catchup은 dag가 활성화될 때 자동으로 과거의 미실행 간격을 실행하는 기능이다.catchup을 True로 설정한 경우 dag를 일시적으로 비활성화했다가 다시 활성화하면 start_date 이후로 그 동안 실행되지 않았던 dag들이 자동으로 실행된다. backfill은 특정 기간의 과거 실행을 수동으로 트리거하는 기능으로 start_date 이전의 데이터를 처리하기 원할 때 사용한다.backfill은 cli를 통해 수행된다. catchup=True examplestart_date와 현재 시간 사이에서 실행되지 않았던 dag들이 한번에 모두 실행된다.airflow의 설정값인 max_active_runs_per_dag(default 16) 값만큼 동시에 실행되기에 서버 부하가 발생할 수 있다.이를 방.. 2024. 8. 10. airflow 실행 날짜 이해: DAG 실행 시간과 execution_date 비교 airflow에는 스케줄링을 위한 여러 가지의 날짜 개념이 존재하며, 대표적으로 start_date, end_date, execution_date 등이 있다. 일반적으로는 이해하는 실행 날짜에 대한 개념은 아래와 같다.start_date스케줄링된 job이 처음 실행되는 시간end_date마지막 job이 실행되는 시간execution_date 실제 job이 실행된 시간 하지만 airflow에서는 다른 의미로 사용된다. airflow에서의 실행 날짜의 기준은 job이 스케줄링된 시간이다.따라서 실제 의미는 아래와 같다.start_datejob을 처음 예약하는 시간end_date마지막 job이 예약되는 시간execution_date job을 처음 예약하는 시간, 데이터 처리의 기준점으로 사용e.g. 만약 매일.. 2024. 8. 9. airflow 스케줄링 : preset, cron, 빈도 기반 airflow를 스케줄링하는 방법으로는 preset 기반, cron 기반, 빈도 기반 3가지가 있다. 1. preset 기반 스케줄링프리셋은 cron 표현식의 대안으로 사용되며, 더 읽기 쉽고 이해하기 쉽다. 주요 프리셋들과 그 의미는 아래와 같다.이름의미비고None스케줄링하지 않고, 외부에서 트리거될 때만 실행한다. @once1번만 실행한다. @continuous이전 실행이 끝나자마자 다시 실행한다.max_active_runs=1 일때만 사용 가능@hourly매 시 정각에 한번씩 실행한다. @daily매일 자정에 1회 실행한다. @weekly매주 일요일 자정에 1회 실행한다. @monthly매월 1일 정각에 한번 실행한다. @quarterly분기별로 1회 첫날 자정에 실행한다. @yearly매년 1월 .. 2024. 8. 9. airflow dag의 구조 bashoperator와 pythonoperator를 사용하는 dag를 생성한다. 각 요소에 대한 자세한 설명은 주석을 확인한다. import pendulumfrom datetime import timedeltaimport timefrom airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.python import PythonOperatortz = pendulum.timezone("Asia/Seoul") # dag 스케쥴링 기준 timezone을 설정합니다.default_args = { 'owner': 'kyeongseo.oh', # dag의 관리자를 명시합니다. 'retries': 1, .. 2024. 6. 22. 이전 1 2 3 다음 반응형