1. Sensor 개요
Sensor는 Airflow에서 특정 조건이 충족될 때까지 기다리는 operator이다. 주요 특징은 다음과 같다.
- 정의된 조건이
True
가 될 때까지 주기적으로 확인한다. - 조건이 충족되면 다음 태스크를 실행할 수 있도록 한다.
- 지정된 timeout까지 조건이 충족되지 않으면 실패한다.
2. Sensor의 실행 모드
Sensor는 두 가지 주요 실행 모드를 가진다: poke와 reschedule
2.1 Poke 모드
- 기본 실행 모드이다.
- 특징:
- 전체 실행 시간 동안 worker slot을 점유한다.
- 조건 체크 사이에 sleep 상태로 존재한다.
- 다음과 같이 실행되는 동안 SLOT을 점유한다.
- 로그 예시:
[2024-08-17, 10:26:30 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:26:40 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:26:50 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:27:00 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:27:10 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:27:20 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:27:30 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
2.2 Reschedule 모드
- 자원을 더 효율적으로 사용하는 모드이다.
- 특징:
- 조건이 충족되지 않으면 worker slot을 해제한다.
- 다음 체크를 위해 태스크를 reschedule한다.
up_for_reschedule
상태일 때는 slot을 사용하지 않는다.- 내부적으로 poke를 호출하기에 poke_interval을 사용한다.
- up_for_reschedule 상태의 TASK
- 로그 예시:
[2024-08-17, 10:28:23 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:28:24 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-08-17, 10:28:24 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2024-08-17, 10:28:35 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-08-17, 10:28:36 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:28:36 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-08-17, 10:28:36 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2024-08-17, 10:28:47 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-08-17, 10:28:48 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:28:48 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-08-17, 10:28:48 UTC] {taskinstance.py:340} ▶ Post task execution logs
3. Sensor 사용법
Airflow에서는 다양한 방법으로 Sensor를 사용할 수 있다:
3.1 내장 Sensor 사용
FileSensor 예제:
file_sensor = FileSensor(
task_id="file_sensor",
filepath="/home/airflow/test1.txt",
poke_interval=10,
timeout=60,
soft_fail=True
)
PythonSensor 예제:
def check_file():
import os
return os.path.exists("/home/airflow/test1.txt")
python_sensor = PythonSensor(
task_id="python_sensor",
python_callable=check_file,
poke_interval=10,
timeout=60,
soft_fail=True
)
3.2 Task Decorator를 사용한 Sensor
@task.sensor(mode='poke', poke_interval=10, timeout=60, soft_fail=True)
def check_file_exists():
import os
return os.path.exists("/home/airflow/test1.txt")
4. 주요 파라미터 설명
poke_interval
: 조건 체크 간격 (초 단위)timeout
: Sensor의 최대 실행 시간soft_fail
:True
로 설정 시, timeout 발생 시 실패 대신 건너뛰기 처리mode
: 'poke' 또는 'reschedule' 선택
5. 전체 DAG 예제
import pendulum
from airflow.decorators import dag, task
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.python import PythonSensor
@dag(
schedule="@once",
start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
catchup=False,
tags=["example", "sensor"],
)
def sensor_test():
@task.sensor(mode='poke', poke_interval=10, timeout=60, soft_fail=True)
def check_file_exists():
import os
return os.path.exists("/home/airflow/test1.txt")
file_sensor = FileSensor(task_id="file_sensor", filepath="/home/airflow/test1.txt", poke_interval=10, timeout=60, soft_fail=True)
def check_file():
import os
return os.path.exists("/home/airflow/test1.txt")
python_sensor = PythonSensor(task_id="python_sensor", python_callable=check_file, poke_interval=10, timeout=60, soft_fail=True)
@task
def process_file():
print("File processed successfully")
[check_file_exists(), file_sensor, python_sensor] >> process_file()
sensor_test()
이 예제는 세 가지 다른 방식으로 구현된 Sensor를 사용하여 파일의 존재 여부를 확인하고, 파일이 존재하면 처리 작업을 수행한다.
6. 베스트 프랙티스
- 적절한
poke_interval
과timeout
설정으로 리소스 사용을 최적화한다. - 장시간 실행되는 Sensor의 경우
reschedule
모드를 고려한다. - 중요하지 않은 Sensor에는
soft_fail=True
를 사용하여 전체 파이프라인 실패를 방지한다.
'aiflow' 카테고리의 다른 글
airflow 외부 시스템 이용하기 -1 : DockerOperator, KubernetesPodOperator, SparkKubernetesOperator(SKO) (0) | 2024.08.17 |
---|---|
airflow ExternalTaskSensor와 TriggerDagRunOperator (0) | 2024.08.17 |
airflow Taskflow api: @dag, @task (0) | 2024.08.11 |
airflow Xcom (0) | 2024.08.11 |
airflow trigger rule (1) | 2024.08.11 |
댓글