본문 바로가기

분류 전체보기111

pyspark aggregateByKey aggregateByKey 함수는 RDD의 모든 값을 결합하여 하나의 결과값을 생성한다. 입력 값과 다른 타입의 결과를 생성할 수 있고, 복잡한 집계 연산을 효율적으로 수행할 수 있다.핵심 기능타입 변환: 입력 값(타입 V)을 결과 값(타입 U)으로 변환할 수 있음메모리 효율성: 함수들이 첫 번째 인자를 수정하여 반환 가능 (새 객체 생성 불필요)2단계 집계: 파티션 내부와 파티션 간의 두 단계 집계를 지원기본 구문rdd.aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None) 주요 매개변수zeroValue:각 키의 집계를 위한 초기값누적기(accumulator)의 시작 상태를 정의seqFunc (시퀀스 함수):파티션 내에서 값들을 처리하는 함수.. 2025. 3. 3.
PySpark Key-Value 쌍 연산 1. 기본 설정 및 데이터 로드from pyspark import SparkContextsc = SparkContext("local", "PySpark Sales Data Examples")# 판매 데이터: (상품, 판매량, 가격)sales_data = sc.parallelize([ ("Apple", 100, 2.5), ("Banana", 150, 1.8), ("Orange", 80, 3.0), ("Apple", 120, 2.5), ("Banana", 200, 1.5), ("Mango", 50, 4.0), ("Orange", 70, 3.2), ("Apple", 90, 2.8)])# Key-Value 쌍 RDD로 변환: (상품, (판매량, 가격, 총액))sales.. 2025. 3. 1.
PySpark의 map과 flatMap 함수 비교 PySpark의 map과 flatMap 함수를 비교한다. map 함수정의map 함수는 RDD의 각 요소에 지정된 함수를 적용하고, 그 결과를 그대로 새로운 RDD의 요소로 사용한다. 특징입력 요소 하나당 출력 요소 하나 (1:1 매핑)입력과 출력 RDD의 요소 수가 동일함함수가 반환하는 값(단일 값, 리스트, 튜플 등)이 그대로 새 RDD의 요소가 됨예제rdd = sc.parallelize([1, 2, 3])# 단일 값 반환result1 = rdd.map(lambda x: x * 2)# 결과: [2, 4, 6]# 리스트 반환result2 = rdd.map(lambda x: [x, x * 2])# 결과: [[1, 2], [2, 4], [3, 6]]# 각 요소가 리스트로 래핑됨 flatMap 함수정의flat.. 2025. 2. 28.
pyspark countByValue 텍스트 파일에서 학점 데이터를 읽고, 각 학점별 발생 빈도를 계산하여 내림차순으로 정렬해 출력하는 예제 전체 코드import pysparksc = pyspark.SparkContext.getOrCreate()sample = "sample/grade.txt"data = sc.textFile(sample)grade = data.map(lambda line : line.split(" ")[1])#각 고유 값이 몇 번 출현하는지 계산하여 결과를 딕셔너리 형태로 반환count = grade.countByValue()# sorted(정렬할 데이터, key 파라미터, reverse 파라미터)for grade, cnt in sorted(count.items(), key=lambda x : x[1], reverse=Tr.. 2025. 2. 28.
pyspark word count 분석 PySpark를 사용한 워드 카운트(Word Count) 알고리즘의 구현과 동작 원리를 분석한다. 전체 코드import pysparktest_file = "hello.txt"sc = pyspark.SparkContext.getOrCreate()text_file = sc.textFile(test_file)counts = text_file.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b)print(counts.collect())hello.txt 파일hello worldhello worldhello 1. SparkContext 생성 .. 2025. 2. 26.
KServe의 Inference Batcher KServe Inference Batcher는 여러 개의 개별 예측 요청을 하나의 배치로 묶어 처리함으로써 전체적인 처리량을 증가시키는 기능이다. 주요 특징구현 위치: KServe 모델 에이전트 사이드카주입 메커니즘: 웹훅을 통한 InferenceService 파드에 주입내부 통신: Go 채널을 사용한 데이터 전송프로토콜 지원: KServe v1 HTTP 프로토콜 (gRPC 미지원) 작동 방식예측 요청이 모델 에이전트 사이드카에 도착Batcher가 요청을 모아 배치 생성배치 완성 시 예측기 컨테이너로 추론 요청 전송  배치 트리거maxBatchSize: 최소 배치 크기. HTTP 요청의 수가 아닌 배치에 포함된 데이터 인스턴스의 수를 의미한다. 최소한 maxBatchSize 만큼의 데이터 인스턴스가 입력.. 2024. 10. 12.
JuiceFS CSI driver를 이용해 MinIO와 HDFS를 Kubernetes와 연동하기 Kubernetes 환경에서 JuiceFS CSI(Container Storage Interface) 드라이버를 사용하여 MinIO와 HDFS를 스토리지 백엔드로 통합하는 방법을 설명한다.JuiceFS CSI 드라이버를 통해 StorageClass를 생성하고, 이를 이용해 PVC를 만들어 Kubernetes POD에서 MinIO와 HDFS 스토리지를 마운트 하여 사용할 수 있다. JuiceFS CSI 드라이버 설치1. Helm 저장소 추가 및 업데이트helm repo add juicefs https://juicedata.github.io/charts/helm repo update 2. 기본 설정 파일 다운로드설정 파일을 다운로드 한 후 필요한 부분은 수정한다. 이 예제에서는 기본 설정을 그대로 사용했다... 2024. 10. 11.
KServe Autoscaler KPA와 HPA 비교 KServe는 Knative Pod Autoscaler (KPA)와 Horizontal Pod Autoscaler (HPA) 두 가지 유형의 auto scaler를 지원한다.KPA는 Knative Serving 설치 시 기본적으로 활성화되지만, HPA를 사용하기 위해서는 별도 설치 및 설정이 필요하다. KPA 제로 스케일링(scale to zero) 기능을 지원한다. Knative Serving 코어의 일부로, Knative Serving 설치 시 기본적으로 활성화된다. CPU, MEMORY 기반 auto scaling을 지원하지 않는다. HTTP 기반 워크로드에 최적화되어 있다.HPA Knative Serving 설치 후 별도로 설치해야 한다. 제로 스케일링(scale to zero) 기능을 지원하지 .. 2024. 10. 11.
KServe Autoscaling & Zero Scale KServe는 Knative를 기반으로 한 강력한 자동 확장 및 제로 스케일 기능을 제공한다. 이 기능들을 통해 사용자는 효율적인 추론 서비스를 구축할 수 있다. Autoscaling 개요 KServe의 자동 확장은 트래픽 패턴에 따라 pod의 수를 동적으로 조정한다. 주요 개념은 다음과 같다. scaleMetric: 확장의 기준이 되는 메트릭scaleTarget: 해당 메트릭의 목푯값. 이 값은 hard limit이 아닌 soft limit으로 요청이 갑자기 급증하여 이 값을 초과하면 새로운 pod가 생성되는 동안 기존 pod가 지정된 값을 초과하여 처리할 수 있다.  scaleMetric 옵션은 다음과 같다. concurrency: 동시 처리 중인 요청 수rps: 초당 요청 수cpu: CPU 사용률m.. 2024. 10. 9.
KServe v2 프로토콜: 모델 메타데이터 개요KServe v2 프로토콜은 모델 메타데이터를 제공하는 기능을 포함하고 있다. 이 기능을 통해 모델의 입력 및 출력 형식, 모델 이름, 버전 등의 정보를 클라이언트에게 제공할 수 있다. 아래 링크에서 개발한 모델에 메타 데이터 구현을 위한 메서드를 추가한다.KServe Custom Predictor 이미지 빌드 가이드 - v2 protocol 프로젝트 구조프로젝트 구조는 다음과 같다.project_root/│├── predictor.py├── requirements.txt├── Dockerfile└── model/ └── model.joblib 모델 메타데이터 구현모델 메타데이터를 제공하기 위해서는 커스텀 예측기 클래스에 다음 메서드들을 구현해야 한다. get_input_types(): 모델 입.. 2024. 10. 1.
반응형