본문 바로가기
kubenetes

Kubernetes에서 Spark 사용하기: spark operator를 이용한 spark on kubernetes 설정 및 사용 방법

by kyeongseo.oh 2024. 9. 9.

spark on k8s 환경을 구축하는 방법과 간단한 사용 방법을 설명한다.

 

1. Spark Operator 소개

spark operator는 kubernetes 환경에서 spark application의 배포와 관리를 간소화하는 커스텀 컨트롤러이다.

crd에 따라 동적으로 driver pod과 executor pod가 실행된다.

 

주요 구성 요소는 다음과 같다.

1. Custom Resource Definitions (CRDs)

  • SparkApplication: 단일 spark application을 정의한다.
  • ScheduledSparkApplication: 주기적으로 실행되는 spark application을 정의한다.

2. operator

  • CRD를 감지하고 필요한 kubernetes 리소스를 생성, 업데이트, 삭제한다.
  • spark driver와 executor pod의 생성을 관리한다.

3. Admission Webhook

  • kubernetes api 서버에 요청이 도달하기 전에 요청을 가로채어 리소스에 필요한 설정을 주입한다.
  • spark application pod가 생성될 때 환경 변수, 볼륨 마운트 등 필요한 설정을 주입한다.

spark operator가 동작하는 방식은 아래와 같다.

  1. 사용자가 SparkApplication을 생성한다.
  2. operator가 이를 감지하고 kubernetes 리소스를 생성한다.
  3. spark driver pod가 생성된다.
  4. spark driver가 executor pod 생성을 요청하고, 관리한다.
  5. spark job이 종료되면 operator가 리소스를 정리한다.

2. Spark Operator 설치

helm을 사용해 spark-operator 네임스페이스에 spark operator를 설치한다.

helm repo add spark-operator https://kubeflow.github.io/spark-operator
helm install spark-operator spark-operator/spark-operator --namespace spark-operator --create-namespace --set webhook.enable=true

 

아래와 같이 출력되면 정상적으로 설치가 된 것이다.

[root@ks1 ~]# k get pod -n spark-operator
NAME                              READY   STATUS    RESTARTS   AGE
spark-operator-7dcb44748b-rmdl6   1/1     Running   0          2m55s

 

3. ServiceAccount 생성

서비스 어카운트를 생성한 후 필요한 권한을 부여한다.

kubectl create serviceaccount spark-operator-spark -n spark-operator
kubectl create clusterrolebinding spark-operator-role --clusterrole=edit --serviceaccount=spark-operator:spark-operator-spark --namespace=spark-operator

 

4. 필요한 설정 파일 및 환경변수 설정

kubernetes cluster 외부에서 spark-submit을 실행하는 경우

  • kubernetes config 파일을 해당 서버의 ~/.kube/config로 복사한다.

spark에서 hdfs를 사용하는 경우

  • 서버에 hdfs-site.xml을 복사한 후 HADOOP_CONF_DIR 환경변수를 설정한다.
  • hdfs-site.xml이 있는 디렉토리를 HADOOP_CONF_DIR 환경변수로 설정하면 pod 생성 시 자동으로 설정이 볼륨 마운트 된다.

 

5. spark-submit 명령 실행

master에는 kubernetes control-plane 주소를 작성하고, 실행 파일은 spark:3.5.0 이미지 내부의 jar 혹은 .py 파일을 작성한다. 사용 가능한 configuration은 다음 링크를 참고한다. (https://spark.apache.org/docs/latest/running-on-kubernetes.html)

spark-submit --master k8s://10.0.1.111:6443 --name k8s-submit \
    --deploy-mode cluster \
    --driver-cores 1 \
    --driver-memory 512m \
    --num-executors 1 \
    --executor-cores 1 \
    --executor-memory 512m \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.kubernetes.namespace=spark-operator \
    --conf spark.kubernetes.container.image=spark:3.5.0 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-operator-spark \
    local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar

 

이 방식은 Spark Operator의 SparkApplication CRD를 사용하지 않고, pod 및 svc를 직접 생성하는 방식으로 application이 완료되면 executor pod는 자동으로 삭제되지만, driver pod는 로그를 유지하기 위해 Completed 상태를 유지된다.

Completed된 pod는 가비지 수집되거나, 수동으로 정리될 때까지 유지되며, 이 상태에서는 어떠한 리소스도 사용하지 않는다.

 

spark job이 kubernetes에 제출되고, driver와 executor가 모두 실행 중인 상태

[root@km ~]# k get pod -n spark-operator
NAME                                 READY   STATUS    RESTARTS   AGE
k8s-submit-9660f491d282f066-driver   1/1     Running   0          10s
spark-operator-7dcb44748b-ch4n5      1/1     Running   0          10d
spark-pi-23538e91d28305c5-exec-1     1/1     Running   0          4s

 

spark job이 성공적으로 종료된 후 driver가 Completed된 상태

[root@km ~]# k get pod -n spark-operator
NAME                                 READY   STATUS      RESTARTS   AGE
k8s-submit-9660f491d282f066-driver   0/1     Completed   0          12s
spark-operator-7dcb44748b-ch4n5      1/1     Running     0          10d

 

Completed 상태의 pod에서 spark job의 log를 확인할 수 있다.

[root@km ~]# k logs -n spark-operator k8s-submit-9660f491d282f066-driver | grep rough
Pi is roughly 3.147675738378692

 

Completed 상태의 pod를 한번에 삭제할 때는 아래 커맨드를 이용한다.

kubectl get pods -n spark-operator --field-selector=status.phase==Succeeded -o name | xargs -r kubectl delete -n spark-operator

 

6. SparkApplication CRD 사용

yaml 파일을 사용해 sparkapplication crd를 생성한다. executor instances 설정만큼 executor pod가 생성된다.

crd를 사용하면 ttl을 적용해 Completed된 pod를 일정 시간 이후 삭제할 수 있다.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark-operator
spec:
  timeToLiveSeconds: 60 # Completed 상태의 pod를 60초 후에 삭제
  type: Scala
  mode: cluster
  image: spark:3.5.0 # 사용하는 이미지
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar # 이미지 내의 jar 파일
  sparkVersion: 3.5.0
  driver:
    labels:
      version: 3.5.0
    cores: 1
    coreLimit: 1000m
    memory: 512m
    serviceAccount: spark-operator-spark # 서비스 어카운트
  executor:
    labels:
      version: 3.5.0
    instances: 2 # executor의 수
    cores: 1 # cpu requests
    coreLimit: 1000m # cpu limit
    memory: 512m

 

yaml 파일을 적용한다.

[root@km spark]# k apply -f spark-pi.yaml
sparkapplication.sparkoperator.k8s.io/spark-pi created

 

yaml을 적용하면 아래와 같이 sparkapplication 리소스가 생성되고, ttl 이후 삭제된다.

[root@km spark]# k get sparkapplications.sparkoperator.k8s.io -n spark-operator
NAME       STATUS    ATTEMPTS   START                  FINISH       AGE
spark-pi   RUNNING   1          2024-09-08T19:32:50Z   <no value>   6s

 

7. ScheduledSparkApplication 사용

cron 표현식을 사용해 작업 실행 주기를 정의한다. ttl 설정은 spec.template에 적용한다.

concurrencyPolicy는 동시 실행 정책을 정의하며 Allow, Forbid, Replace 중 선택할 수 있다.

  • Allow: 여러 개의 SparkApplications가 동시에 실행될 수 있다.
  • Forbid: SparkApplications의 동시 실행을 금지하고, 이전 실행이 아직 완료되지 않은 경우 다음 실행을 건너뛴다.
  • Replace: 현재 실행 중인 SparkApplications을 종료하고 새로운 SparkApplications를 실행한다.
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: ScheduledSparkApplication
metadata:
  name: spark-pi-scheduled
  namespace: spark-operator
spec:
  schedule: "*/1 * * * *"
  concurrencyPolicy: Allow  # Allow, Forbid, Replace 중 선택
  template:
    timeToLiveSeconds: 60
    type: Scala
    mode: cluster
    image: "spark:3.5.0"
    imagePullPolicy: IfNotPresent
    mainClass: org.apache.spark.examples.SparkPi
    mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar"
    sparkVersion: 3.5.0
    driver:
      cores: 1
      coreLimit: 1000m
      memory: 512m
      serviceAccount: spark-operator-spark
    executor:
      cores: 1
      instances: 1
      memory: "512m"

 

yaml을 적용한다.

[root@km spark]# k apply -f spark-pi-scheduled.yaml
scheduledsparkapplication.sparkoperator.k8s.io/spark-pi-scheduled created

 

cron 표현식에 따라 매 분 실행되며, ttl 이후 sparkapplication이 삭제된다.

[root@km spark]# k get sparkapplications.sparkoperator.k8s.io -n spark-operator
NAME                                     STATUS      ATTEMPTS   START                  FINISH                 AGE
spark-pi-scheduled-1725824760487512234   COMPLETED   1          2024-09-08T19:46:04Z   2024-09-08T19:46:19Z   65s
spark-pi-scheduled-1725824820487880434   RUNNING     1          2024-09-08T19:47:04Z   <no value>             5s

댓글