본문 바로가기
aiflow

airflow 외부 시스템 이용하기 -1 : DockerOperator, KubernetesPodOperator, SparkKubernetesOperator(SKO)

by kyeongseo.oh 2024. 8. 17.

airflow에서 docker, kubernetes, spark 등의 외부 시스템을 이용하는 방법을 설명한다.


1. 원격 docker daemon과 연동하기

원격 docker daemon에 연동하기 위해서는 우선 docker 관련 설정이 필요하다.

docker 서버(daemon) 설정과 docker client(airflow 서버) 설정으로 구분할 수 있다.

 

  • docker 서버 설정 (docker daemon에서 진행)

우선 docker daemon을 원격 접속할 수 있도록 설정한다. daemon.json과 docker.service 파일을 수정해야한다.

 

vi /etc/docker/daemon.json
{
  "exec-opts": ["native.cgroupdriver=systemd"],
  "hosts": ["tcp://0.0.0.0:2375", "unix:///var/run/docker.sock"]
}

 

 

vi /lib/systemd/system/docker.service

 

ExecStart=/usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock

위를 아래처럼 변경 (-H fd:// 를 제거한다)

ExecStart=/usr/bin/dockerd --containerd=/run/containerd/containerd.sock

 

 

파일 수정이 완료되었다면 docker 서비스 파일의 변경된 설정을 적용한 후 docker를 재시작 해준다.

docker가 재시작되면 `docker -H 10.0.1.111:2375 ps` 와 같이 remote-host 주소를 입력해 명령을 실행할 수 있다.

systemctl daemon-reload
systemctl restart docker

 

  • docker client 설정 (airflow 서버에서 진행)

docker client에서는 provider만 설치해주면 된다. docker connection은 airflow restart 후 생성이 가능하다.

pip install apache-airflow-providers-docker

 

만약 airflow 서버에서 docker cli를 사용하고자 하는 경우에는 아래 커맨드를 통해 docker cli를 설치한다.

# docker cli 설치
sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
sudo yum install -y docker-ce-cli

# 원격 docker daemon에 명령 전달
export DOCKER_HOST="tcp://10.0.1.111:2375"
docker ps

 

  • DockerOperator를 사용해 DAG 개발

DockerOperator를 사용하면 격리된 환경에서 task가 실행되므로, 의존성 충돌을 방지할 수 있고, docker를 사용해 각 task의 리로스 사용을 제한하고, 관리할 수 있다. 또한 여러 이미지를 통해 python, r, java 등 다양한 언어 및 프레임워크를 워크플로우에 활용할 수 있다.

import pendulum
from airflow.decorators import dag, task
from airflow.providers.docker.operators.docker import DockerOperator

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "docker_op"],
)
def docker_operator():

    docker_op = DockerOperator(
        task_id='echo_container',
        image='ubuntu:22.04', # 컨테이너 이미지
        docker_url="tcp://10.0.1.111:2375", # 원격 docker daemon 주소
        container_name='echo_container', # 컨테이너 이름
        auto_remove='force', # 컨테이너 종료 시 컨테이너를 무조건 삭제
        command=["/bin/sh", "-c", "sleep 30 && whoami"], # 컨테이너 실행 명령어
    )

    docker_op

docker_operator()

 

DAG에 선언한 container_name과 동일한 이름으로 container가 생성된 것을 확인할 수 있다.


2. kubernetes와 연동하기

kubernetes와 airflow를 연동하기 위해서는 kubernetes cluster의 config 파일을 airflow 서버로 복사해줘야 한다.

~/.kube/config 파일을 airflow 서버에 복사한 후 provider를 설치해 KubernetesPodOperator를 사용할 준비를 한다.

pip install apache-airflow-providers-cncf-kubernetes

 

KubernetesPodOperator을 configmap, secret, volume 등을 사용해 설정 관리, 민감한 정보 보호, 그리고 데이터 영속성을 효과적으로 처리할 수 있다.

import pendulum
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "k8s_op"],
)
def kubernetes_operator():

    k8s_op = KubernetesPodOperator(
        task_id="k8s_op",
        image="r-base", # 실행에 사용할 이미지
        namespace="test", # pod를 실행할 namespace 지정
        cmds=["Rscript", "-e"],
        arguments=["""
        print('kubernetespodoperator R test')
        Sys.sleep(10)
        print('R script executed successfully')
        """],
        in_cluster=False, # airflow가 k8s 클러스터 내부에서 실행중인지
        cluster_context=None, # 클러스터 컨텍스트를 지정 None이면 current-context를 사용
        config_file="/opt/airflow/.kube/config" , # defalut는 `~/.kube/config`
        get_logs=True, # stdout을 task의 log로 남긴다.
    )

    k8s_op

kubernetes_operator()

 

DAG가 트리거되면 K8S cluster에 pod가 생성된다.


3. spark 연동하기

SparkKubernetesOperator을 사용해 airflow에서 spark job을 수행하는 방법을 알아보자.

이 operator를 사용하면 kubernetes 위에서 spark job이 구동된다. airflow 서버에 provider를 설치한 후에 사용 가능하다.

pip install apache-airflow-providers-cncf-kubernetes

 

  • Kubernetes cluster에 필요한 설정

kubernetes cluster에는 spark operator가 설치되어 있어야 한다. 아래 커맨드를 통해 설치한다.

helm repo add spark-operator https://kubeflow.github.io/spark-operator
helm install spark-operator spark-operator/spark-operator --namespace spark-operator --create-namespace --set webhook.enable=true

 

아래와 같이 출력되면 설치가 잘 끝났다고 보면 된다.

[root@ks1 ~]# k get pod -n spark-operator
NAME                              READY   STATUS    RESTARTS   AGE
spark-operator-7dcb44748b-rmdl6   1/1     Running   0          2m55s

 

그 다음 서비스 어카운트를 생성한 후 필요한 권한을 부여한다. 권한이 없으면 airflow에서 던진 spark job은 실패한다.

kubectl create serviceaccount spark-operator-spark -n spark-operator
kubectl create clusterrolebinding spark-operator-role --clusterrole=edit --serviceaccount=spark-operator:spark-operator-spark --namespace=spark-operator

 

  • SparkKubernetesOperator 사용하기

DAG를 생성하기에 앞서 SparkKubernetesOperator에서 사용할 Spark application yaml을 작성한다.

yaml에는 사용할 spark image와 실행할 jar 파일, mainClass, 리소스, 서비스 어카운트 등의 정보를 입력한다.

# spark-pi.yaml
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark-operator
spec:
  type: Scala
  mode: cluster
  image: spark:3.5.0 # 사용하는 이미지
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar # 이미지 내의 jar 파일
  sparkVersion: 3.5.0
  driver:
    labels:
      version: 3.5.0
    cores: 1
    coreLimit: 1000m
    memory: 512m
    serviceAccount: spark-operator-spark # 서비스 어카운트
  executor:
    labels:
      version: 3.5.0
    instances: 2 # executor의 수
    cores: 1 # cpu requests 
    coreLimit: 1000m # cpu limit
    memory: 512m

 

이제 SparkKubernetesOperator를 사용한 DAG를 생성한다. application_file에 위에서 작성한 yaml 파일의 경로를 작성해주면 된다.

import pendulum
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "sparkOnK8s"],
)
def sparkOnK8s():

    sko = SparkKubernetesOperator(
        task_id="sko" ,
        config_file="/opt/airflow/.kube/config",
        in_cluster=False,
        namespace="spark-operator",
        application_file="spark-pi.yaml"
    )

    sko
  
sparkOnK8s()

 

위 DAG를 실행하면 Spark Operator가 작동하여 Spark driver pod와 executor pod를 생성되고 main 메소드가 실행된다.

[root@ks1 spark]# k get pod -n spark-operator
NAME                               READY   STATUS    RESTARTS   AGE
sko-fg4i5gvh-driver                1/1     Running   0          39s
spark-operator-7dcb44748b-rmdl6    1/1     Running   0          13h
spark-pi-42fd449169f9d579-exec-1   1/1     Running   0          3s
spark-pi-42fd449169f9d579-exec-2   1/1     Running   0          3s

 


4. spark 연동하기 - git sync

3. spakr 연동하기처럼 사용하면 spark 코드에 변경점이 발생할 경우 매번 새로운 이미지를 빌드해야하는 불편함이 있다. 물론 CI / CD 환경이 구축되어 있다면 큰 문제는 되지 않겠지만, 이미지 버전 관리하는 것도 귀찮으니 git을 사용해서 작업해보자.

 

  • git-credentials 시크릿 생성하기 (k8s cluster에서 작업)

private repo에 접근하기 위해 git username 과 token을 base64해 secret을 생성한다.

[root@ks1 spark]# echo 'kyeongseooh' |  tr -d '\n' | base64
a3llb25nc2Vvb2g=

 

k8s cluster에 아래와 같이 yaml을 사용해 secret을 생성한다.

apiVersion: v1
kind: Secret
metadata:
  name: git-credentials
  namespace: spark-operator
data:
  username: a3llb25nc2Vvb2g=
  password: Z2hwXzAyNXuuuuuuFbUdoZExxxxxxxxxxxxxxxxxx==

 

  • SparkKubernetesOperator 사용하기

https://github.com/kyeongseooh/pyspark_job 를 clone하는 initContainers를 추가해 pod가 생성되면 git을 clone하도록 하고, git에 올라가 있는 spark code를 실행하는 Spark application yaml을 작성했다.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: pyspark-pi
  namespace: spark-operator
spec:
  type: Python
  mode: cluster
  image: apache/spark-py:v3.4.0
  imagePullPolicy: IfNotPresent
  pythonVersion: "3"
  mainApplicationFile: local:///opt/spark_apps/pyspark_test.py
  sparkVersion: 3.4.0
  restartPolicy:
    type: Never
  driver:
    cores: 1
    coreLimit: "1000m"
    memory: "512m"
    labels:
      version: 3.4.0
    serviceAccount: spark-operator-spark
    initContainers:
    - name: git-sync
      image: alpine/git
      command:
        - "/bin/sh"
        - "-c"
      args:
        - |
          git clone https://${GIT_USERNAME}:${GIT_PASSWORD}@github.com/kyeongseooh/pyspark_job.git /opt/spark_apps
          ls -al /opt/spark_apps
      env:
        - name: GIT_USERNAME
          valueFrom:
            secretKeyRef:
              name: git-credentials
              key: username
        - name: GIT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: git-credentials
              key: password
      volumeMounts:
        - name: spark-apps
          mountPath: /opt/spark_apps
    volumeMounts:
      - name: spark-apps
        mountPath: /opt/spark_apps
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    initContainers:
    - name: git-sync
      image: alpine/git
      command:
        - "/bin/sh"
        - "-c"
      args:
        - |
          git clone https://${GIT_USERNAME}:${GIT_PASSWORD}@github.com/kyeongseooh/pyspark_job.git /opt/spark_apps
      env:
        - name: GIT_USERNAME
          valueFrom:
            secretKeyRef:
              name: git-credentials
              key: username
        - name: GIT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: git-credentials
              key: password
      volumeMounts:
        - name: spark-apps
          mountPath: /opt/spark_apps
    volumeMounts:
      - name: spark-apps
        mountPath: /opt/spark_apps
  volumes:
    - name: spark-apps
      emptyDir: {}

 

 

이제 SparkKubernetesOperator를 사용한 DAG를 생성한다. application_file에 위에서 작성한 yaml 파일의 경로를 작성해주면 된다.

import pendulum
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    tags=["example", "sparkOnK8s"],
)
def sparkOnK8s():

    sko = SparkKubernetesOperator(
        task_id="sko-git" ,
        config_file="/opt/airflow/.kube/config",
        in_cluster=False,
        namespace="spark-operator",
        application_file="spark-job.yaml",
        delete_on_termination=True
    )

    sko
  
sparkOnK8s()

 

위 DAG를 실행하면 Spark Operator가 작동하여 Spark driver pod와 executor pod를 생성되고 main 메소드가 실행된다.

[root@ks1 spark]# k get pod -n spark-operator
NAME                                                 READY   STATUS    RESTARTS   AGE
sko-git-m51ejeef-driver                              1/1     Running   0          19s
spark-operator-7dcb44748b-q99wf                      1/1     Running   0          20h
sparkapplicationpysparkjob-c66e9f916f2f1239-exec-1   1/1     Running   0          9s

 

아래와 같이 정상적으로 spark job이 처리되었음을 확인할 수 있다.

댓글