본문 바로가기
aiflow

airflow sensor: poke와 reschedule

by kyeongseo.oh 2024. 8. 13.

1. Sensor 개요

Sensor는 Airflow에서 특정 조건이 충족될 때까지 기다리는 operator이다. 주요 특징은 다음과 같다.

  • 정의된 조건이 True가 될 때까지 주기적으로 확인한다.
  • 조건이 충족되면 다음 태스크를 실행할 수 있도록 한다.
  • 지정된 timeout까지 조건이 충족되지 않으면 실패한다.

2. Sensor의 실행 모드

Sensor는 두 가지 주요 실행 모드를 가진다: poke와 reschedule

2.1 Poke 모드

  • 기본 실행 모드이다.
  • 특징:
    • 전체 실행 시간 동안 worker slot을 점유한다.
    • 조건 체크 사이에 sleep 상태로 존재한다.
  • 다음과 같이 실행되는 동안 SLOT을 점유한다.

  • 로그 예시:
[2024-08-17, 10:26:30 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:26:40 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:26:50 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:27:00 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:27:10 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:27:20 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:27:30 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>

2.2 Reschedule 모드

  • 자원을 더 효율적으로 사용하는 모드이다.
  • 특징:
    • 조건이 충족되지 않으면 worker slot을 해제한다.
    • 다음 체크를 위해 태스크를 reschedule한다.
    • up_for_reschedule 상태일 때는 slot을 사용하지 않는다.
    • 내부적으로 poke를 호출하기에 poke_interval을 사용한다.
  • up_for_reschedule 상태의 TASK
  • 로그 예시:
[2024-08-17, 10:28:23 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:28:24 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-08-17, 10:28:24 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2024-08-17, 10:28:35 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-08-17, 10:28:36 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:28:36 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-08-17, 10:28:36 UTC] {taskinstance.py:340} ▶ Post task execution logs
[2024-08-17, 10:28:47 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-08-17, 10:28:48 UTC] {python.py:75} INFO - Poking callable: <function sensor_test.<locals>.check_file_exists at 0x7ff52e55d940>
[2024-08-17, 10:28:48 UTC] {taskinstance.py:309} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-08-17, 10:28:48 UTC] {taskinstance.py:340} ▶ Post task execution logs

3. Sensor 사용법

Airflow에서는 다양한 방법으로 Sensor를 사용할 수 있다:

3.1 내장 Sensor 사용

FileSensor 예제:

   file_sensor = FileSensor(
       task_id="file_sensor",
       filepath="/home/airflow/test1.txt",
       poke_interval=10,
       timeout=60,
       soft_fail=True
   )

 

PythonSensor 예제:

   def check_file():
       import os
       return os.path.exists("/home/airflow/test1.txt")

   python_sensor = PythonSensor(
       task_id="python_sensor",
       python_callable=check_file,
       poke_interval=10,
       timeout=60,
       soft_fail=True
   )

 

 

3.2 Task Decorator를 사용한 Sensor

@task.sensor(mode='poke', poke_interval=10, timeout=60, soft_fail=True)
def check_file_exists():
    import os
    return os.path.exists("/home/airflow/test1.txt")

4. 주요 파라미터 설명

  • poke_interval : 조건 체크 간격 (초 단위)
  • timeout : Sensor의 최대 실행 시간
  • soft_fail : True로 설정 시, timeout 발생 시 실패 대신 건너뛰기 처리
  • mode : 'poke' 또는 'reschedule' 선택

5. 전체 DAG 예제

import pendulum
from airflow.decorators import dag, task
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.python import PythonSensor

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "sensor"],
)
def sensor_test():
    @task.sensor(mode='poke', poke_interval=10, timeout=60, soft_fail=True)
    def check_file_exists():
        import os
        return os.path.exists("/home/airflow/test1.txt")

    file_sensor = FileSensor(task_id="file_sensor", filepath="/home/airflow/test1.txt", poke_interval=10, timeout=60, soft_fail=True)

    def check_file():
        import os
        return os.path.exists("/home/airflow/test1.txt")    

    python_sensor = PythonSensor(task_id="python_sensor", python_callable=check_file, poke_interval=10, timeout=60, soft_fail=True)

    @task
    def process_file():
        print("File processed successfully")

    [check_file_exists(), file_sensor, python_sensor] >> process_file()

sensor_test()

이 예제는 세 가지 다른 방식으로 구현된 Sensor를 사용하여 파일의 존재 여부를 확인하고, 파일이 존재하면 처리 작업을 수행한다.

6. 베스트 프랙티스

  1. 적절한 poke_intervaltimeout 설정으로 리소스 사용을 최적화한다.
  2. 장시간 실행되는 Sensor의 경우 reschedule 모드를 고려한다.
  3. 중요하지 않은 Sensor에는 soft_fail=True를 사용하여 전체 파이프라인 실패를 방지한다.

댓글