본문 바로가기
aiflow

airflow에서 remote spark cluster에 job submit 하기: SparkSubmitOperator

by kyeongseo.oh 2024. 9. 3.

airflow에서 SparkSubmitOperator와 task decorator를 사용해 remote spark cluster에 job을 submit하는 방법을 알아본다.

spark on yarn 환경으로 spark 3.3.2, hadoop 3이 설치되어 있다.

 

1. spark binary 다운로드

airflow에서 remote spark에 job을 submit 하기 위해서는 airflow가 설치된 서버에 spark-submit 호출이 가능해야 한다.

이를 위해 spark binary를 다운받아 spark-submit이 가능하도록 했다.

spark 3.3.2, hadoop 3을 사용하고 있어 spark-3.3.2-bin-hadoop3를 다운받았다.

## binary 다운로드
wget https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

## 압축 해제
tar -zxvf spark-3.3.2-bin-hadoop3.tgz

 

2. hadoop configuration file 복사

hadoop 클러스터에 대한 접속정보를 가지고 있는 설정 파일을 airflow 서버로 복사해준다. airflow는 설정파일의 내용을 바탕으로 hadoop 클러스터에 job을 제출한다. core-site.xml, hdfs-site.xml, yarn-site.xml 3개의 설정 파일을 복사했다.

scp root@10.0.1.60:/etc/hadoop/conf/core-site.xml .
scp root@10.0.1.60:/etc/hadoop/conf/hdfs-site.xml .
scp root@10.0.1.60:/etc/spark3/conf.cloudera.spark3_on_yarn/yarn-conf/yarn-site.xml .

 

3. 환경변수 설정 및 airflow 재시작

airflow가 설정파일과 spark-submit의 위치를 알 수 있도록 SPARK_HOME과 HADOOP_CONF_DIR을 설정해 준다.

/etc/sysconfig/airflow에 아래 내용을 추가한다. 추가한 설정을 airflow가 인식하도록 airflow를 재시작 해야 한다.

SPARK_HOME=/home/airflow/spark3/spark-3.3.2-bin-hadoop3
HADOOP_CONF_DIR=/opt/airflow/hadoop

 

4. hdfs에 airflow user 디렉토리 생성

spark job이 airflow user로 전달되기에 hdfs에 airflow user가 없으면 permission denied가 발생한다. 이를 방지하기 위해 hdfs에 airflow user 디렉토리를 추가해 user를 등록한다.

sudo -u hdfs hdfs dfs -mkdir /user/airflow
sudo -u hdfs hdfs dfs -chown airflow:airflow /user/airflow

 

5. spark connection 생성하기

아래와 같이 spark connection을 생성한다. Host를 yarn으로 작성하면 yarn-site.xml을 통해 yarn 정보를 확인하고, spark binary의 spark-submit은 SPARK_HOME 환경 변수를 통해 위치를 확인한다.

 

6. SparkSubmitOperator를 사용해 spark job 제출하기

아래는 SparkSubmitOperator를 사용해 Pi를 계산하는 예제이다. 실행할 jar 파일과 java class를 지정한 후 DAG를 실행하면 spark job이 제출된다.

여러 hadoop cluster가 있고, 경우에 따라 다른 cluster에 job을 제출해야 하는 경우에는 아래와 같이 `os.environ`을 사용해 DAG 파일 내부에서 SPARK_HOME과 HADOOP_CONF_DIR 환경변수를 설정해주면 해당 cluster로 job이 전달된다.

import os
import pendulum
from airflow.decorators import dag, task
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

# os.environ["SPARK_HOME"] = "/home/airflow/spark3/spark-3.3.2-bin-hadoop3"
# os.environ["HADOOP_CONF_DIR"] = "/opt/airflow/hadoop"

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    max_active_tasks=3,
    tags=["example", "remote spark"],
)
def submit_spark():

    spark_submit = SparkSubmitOperator(
        task_id="spark_submit",
        name="spark_job_by_airflow",
        conn_id="dd-spark",
        application='/home/airflow/spark3/spark-3.3.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.3.2.jar',
        java_class='org.apache.spark.examples.SparkPi',
    )

    spark_submit

submit_spark()

 

yarn ResourceManager 웹 UI에서 job이 submit 된 것을 확인할 수 있다.

 

로그의 경우에는 airflow에 남지 않고, yarn에 기록된다. airflow에는 아래와 같이 tracking URL이 기록되고, 해당 URL로 접속하면 yarn에 기록된 로그를 확인할 수 있다.

[2024-09-03, 12:37:04 UTC] {spark_submit.py:550} INFO - Identified spark application id: application_1725281056228_0022
[2024-09-03, 12:37:04 UTC] {spark_submit.py:579} INFO - tracking URL: http://hdm3.datalake.net:8088/proxy/application_1725281056228_0022/
[2024-09-03, 12:37:04 UTC] {spark_submit.py:579} INFO - user: airflow

 

7. SparkSubmitOperator를 사용해 pyspark job 제출하기

아래 예제는 pyspark을 사용해 dataframe을 생성하고, dataframe의 내용을 csv 포맷으로 hdfs에 기록하는 예제이다.

application 파라미터에 실행할 pyspark script 경로를 입력한다.

import pendulum
from airflow.decorators import dag, task
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

@dag(
    schedule="@once",
    start_date=pendulum.datetime(2024, 8, 10, 3, tz=pendulum.timezone("Asia/Seoul")),
    catchup=False,
    max_active_tasks=3,
    tags=["example", "remote spark", "pyspark"],
)
def submit_pyspark():
    spark_submit = SparkSubmitOperator(
        task_id="spark_submit",
        name="pyspark_job_by_airflow",
        conn_id="dd-spark",
        application='/opt/airflow/dags/pyspark_script.py',  # pyspark 스크립트 경로
        # py_files='/path/to/additional_module.py',  # 추가 Python 모듈 (옵션)
    )
    spark_submit

submit_pyspark()

 

아래는 실행할 pyspark 코드로, dataframe 생성 후 해당 내용을 hdfs의 /user/airflow/pyspark_job 경로에 저장한다.

from pyspark.sql import SparkSession

def main():
    spark = SparkSession.builder.appName("pyspark_airflow").getOrCreate()
  
    data = [("AI", 25, 5000),
            ("BI", 30, 6500),
            ("CI", 35, 7000),
            ("DI", 40, 8500),
            ("EI", 45, 9000)]

    columns = ["name", "age", "salary"]

    df = spark.createDataFrame(data, columns)

    print("print dataframe : ")
    df.show()

    hdfs_path = "hdfs:///user/airflow/pyspark_job"
    df.coalesce(1).write.mode("overwrite").csv(hdfs_path, header=True)

if __name__ == "__main__":
    main()

 

yarn ResourceManager 웹 UI에서 job이 submit 된 것을 확인할 수 있다. 동일하게 로그는 tracking URL에 접근해 확인해야 한다.

 

hdfs에서 확인해보면 아래와 같이 지정한 경로에 파일이 생성된 것을 확인할 수 있다.

[root@adm ~]# hdfs dfs -ls /user/airflow/pyspark_job/
Found 2 items
-rw-r--r--   3 airflow airflow          0 2024-09-03 21:50 /user/airflow/pyspark_job/_SUCCESS
-rw-r--r--   3 airflow airflow         71 2024-09-03 21:50 /user/airflow/pyspark_job/part-00000-2dcf8279-4af6-46da-8b38-4c9af3cb529b-c000.csv
[root@adm ~]# hdfs dfs -cat /user/airflow/pyspark_job/part-00000-2dcf8279-4af6-46da-8b38-4c9af3cb529b-c000.csv
name,age,salary
AI,25,5000
BI,30,6500
CI,35,7000
DI,40,8500
EI,45,9000

 

 

댓글