본문 바로가기

분류 전체보기106

airflow Data-aware scheduling과 Dataset 1. Data-aware scheduling 개요데이터 셋 업데이트를 기반으로 DAG를 스케줄링할 수 있는 기능이다.아래와 같이 Denpendency Graph를 통해 데이터 셋을 업데이트하는 DAG와 데이터 셋 변경에 의해 트리거된 DAG를 시각적으로 확인할 수 있다.2. Dataset 개념데이터 업데이트를 알리고 이를 기반으로 워크플로우를 트리거하는 역할을 한다.데이터의 논리적 그룹을 나타내는 추상적인 개념으로 실제 데이터를 저장하거나, 관리하지 않는다.URI(Uniform Resource Identifier)로 정의된다. 이 URI는 데이터의 위치나 식별자 역할을 하지만, 실제 데이터를 포함하지는 않는다.3. Dataset 사용 방법3-1. Dataset 생성 방법from airflow.datase.. 2024. 8. 17.
airflow Params 1. Airflow Params 개요DAG와 태스크에 동적으로 값을 전달하는 방법이다.런타임에 추가 Params를 제공하거나 Params 값을 덮어쓸 수 있어 DAG의 유연성이 높아진다.스케줄링된 DAG는 Params에 설정된 기본값을 사용한다.DAG를 수동으로 시작하면 Params를 수정해 DAG를 실행할 수 있다.동일한 DAG를 다른 파라미터로 여러 번 실행할 수 있어 DAG의 재사용성이 높아진다.2. Params를 사용 사례특정 날짜의 데이터를 재처리 하는 CASE가 있는 경우, 날짜를 Params로 지정한다.개발 / 테스트 / 운영 환경에 따라 다른 database를 사용하는 경우, 연결 정보를 Params로 지정한다.파라미터를 변경하며, 성능 테스트를 진행하는 경우, 파라미터를 Params로 지.. 2024. 8. 17.
airflow 외부 시스템 이용하기 -2: ImpalaHook, S3Hook impala, s3 등의 외부 시스템을 이용하는 방법을 설명한다.1. Impala 연동하기impala provider 설치Impala connection을 생성하기 위해 provider를 설치합니다.apache-airflow-providers-apache-impala connection 생성하기host에는 impala coordinator의 주소를 입력합니다. coordinator가 없다면 impala daemon 중 하나의 주소를 입력합니다. Impala와 연동하기Impala provider는 operator를 제공하지 않고, hook만 제공하기에, hook을 사용해 Impala에 쿼리를 실행해야 합니다.import pendulumfrom airflow.decorators import dag, task.. 2024. 8. 17.
airflow 외부 시스템 이용하기 -1 : DockerOperator, KubernetesPodOperator, SparkKubernetesOperator(SKO) airflow에서 docker, kubernetes, spark 등의 외부 시스템을 이용하는 방법을 설명한다.1. 원격 docker daemon과 연동하기원격 docker daemon에 연동하기 위해서는 우선 docker 관련 설정이 필요하다.docker 서버(daemon) 설정과 docker client(airflow 서버) 설정으로 구분할 수 있다. docker 서버 설정 (docker daemon에서 진행)우선 docker daemon을 원격 접속할 수 있도록 설정한다. daemon.json과 docker.service 파일을 수정해야한다. vi /etc/docker/daemon.json{ "exec-opts": ["native.cgroupdriver=systemd"], "hosts": ["tc.. 2024. 8. 17.
airflow ExternalTaskSensor와 TriggerDagRunOperator ExternalTaskSensorDAG 간 의존성이 존재할 경우에 사용되는 sensor로 다른 DAG의 특정 태스크가 완료되었는 지를 확인하는 역할을 수행한다.ExternalTaskSensor은 Logical Date를 기준으로 upstream DAG의 수행을 판단한다. 스케줄링된 시간이 다르거나, 수동으로 DAG를 트리거 한 경우에는 Sensor가 제대로 동작하지 않게 된다. 아래와 같이 DAG의 Logical Date가 동일한 경우에만 ExternalTaskSensor는 upstream DAG가 수행된 것으로 판단한다는 점에 주의하자.  ExternalTaskSensor 예제 코드import pendulumfrom airflow.decorators import dag, taskfrom airflow... 2024. 8. 17.
airflow sensor: poke와 reschedule 1. Sensor 개요Sensor는 Airflow에서 특정 조건이 충족될 때까지 기다리는 operator이다. 주요 특징은 다음과 같다.정의된 조건이 True가 될 때까지 주기적으로 확인한다.조건이 충족되면 다음 태스크를 실행할 수 있도록 한다.지정된 timeout까지 조건이 충족되지 않으면 실패한다.2. Sensor의 실행 모드Sensor는 두 가지 주요 실행 모드를 가진다: poke와 reschedule2.1 Poke 모드기본 실행 모드이다.특징:전체 실행 시간 동안 worker slot을 점유한다.조건 체크 사이에 sleep 상태로 존재한다.다음과 같이 실행되는 동안 SLOT을 점유한다.로그 예시:[2024-08-17, 10:26:30 UTC] {python.py:75} INFO - Poking c.. 2024. 8. 13.
airflow Taskflow api: @dag, @task 1. Taskflow API 소개Taskflow API는 Airflow에서 Python 함수를 간단하게 DAG 태스크로 변환할 수 있는 기능이다. 이 API를 사용하면 다음과 같은 이점이 있다.기존 Python 코드를 Airflow로 쉽게 이관할 수 있다.코드의 가독성과 유지보수성을 향상시킬 수 있다.DAG 구조를 더 직관적으로 표현할 수 있다.2. 기본 사용법Taskflow API의 핵심 데코레이터는 아래와 같다.@dag : DAG를 정의하는 데코레이터이다.@task : 개별 태스크를 정의하는 데코레이터이다.기본 예시import pendulumfrom airflow.decorators import dag, taskimport json@dag( schedule="@once", start_dat.. 2024. 8. 11.
airflow Xcom Xcom은 Cross-Communications의 약자로 task 간 데이터를 교환하는 데 사용되는 기능이다.key-value 형태로 key, value, timestamp, task_id, dag_id 등의 메타데이터가 db에 저장된다.xcom_push, xcom_pull 메소드를 사용해 Xcom을 push/pull 할 수 있으며, Operator의 반환 값과 taskflow에서 사용하는 argument들은 자동으로 Xcom에 저장된다. 일반적으로는 airflow 메타 디비에 저장되나, custom한 Xcom을 사용하면 s3 등의 별도 저장소에 저장할 수 있다. Xcom 사용 시 유의사항1. Xcom은 SQLAlchemy의 LargeBinary에 저장된다. - SQLite : blob type에 .. 2024. 8. 11.
airflow trigger rule Trigger Rule은 선행 태스크들의 상태에 따라 현재 태스크의 실행 여부를 결정하는 규칙이다.기본적으로 Airflow는 모든 직접적인 상위 태스크가 성공적으로 완료되어야 다음 태스크를 실행한다.하지만 Trigger Rule을 사용하면 이 기본 동작을 변경할 수 있어 좀 더 복잡한 dag를 설계할 수 있다. Airflow에서 제공하는 Trigger Rule의 종류는 아래와 같다. 아래에서 말하는 상위 task는 현재 task와 직접적으로 연결된 이전 태스크를 의미한다.Trigger RuleDescriptionall_success상위 task가 모두 성공하면 실행all_failed상위 task가 모두 실패하면 실행all_done상위 task가 모두 완료되면 실행 (성공 / 실패 여부 무관)all_ski.. 2024. 8. 11.
airflow task 의존성 task 의존성은 DAG(Directed Acyclic Graph) 내에서 task 간의 실행 순서와 관계를 정의하는 개념으로 airflow에서는비트시프트 연산자(>>)를 사용해 task 간의 의존성을 설정할 수 있다.task 의존성의 패턴으로는 선형 의존성, fan-out, fan-in, 조건부 의존성이 있다. 1. 선형 의존성import pendulumfrom airflow.decorators import dag, task@dag( schedule="@once", start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")), catchup=False, tags=["kyeongseo.oh", "ta.. 2024. 8. 11.
반응형