본문 바로가기

pyspark5

pyspark aggregateByKey aggregateByKey 함수는 RDD의 모든 값을 결합하여 하나의 결과값을 생성한다. 입력 값과 다른 타입의 결과를 생성할 수 있고, 복잡한 집계 연산을 효율적으로 수행할 수 있다.핵심 기능타입 변환: 입력 값(타입 V)을 결과 값(타입 U)으로 변환할 수 있음메모리 효율성: 함수들이 첫 번째 인자를 수정하여 반환 가능 (새 객체 생성 불필요)2단계 집계: 파티션 내부와 파티션 간의 두 단계 집계를 지원기본 구문rdd.aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None) 주요 매개변수zeroValue:각 키의 집계를 위한 초기값누적기(accumulator)의 시작 상태를 정의seqFunc (시퀀스 함수):파티션 내에서 값들을 처리하는 함수.. 2025. 3. 3.
PySpark Key-Value 쌍 연산 1. 기본 설정 및 데이터 로드from pyspark import SparkContextsc = SparkContext("local", "PySpark Sales Data Examples")# 판매 데이터: (상품, 판매량, 가격)sales_data = sc.parallelize([ ("Apple", 100, 2.5), ("Banana", 150, 1.8), ("Orange", 80, 3.0), ("Apple", 120, 2.5), ("Banana", 200, 1.5), ("Mango", 50, 4.0), ("Orange", 70, 3.2), ("Apple", 90, 2.8)])# Key-Value 쌍 RDD로 변환: (상품, (판매량, 가격, 총액))sales.. 2025. 3. 1.
PySpark의 map과 flatMap 함수 비교 PySpark의 map과 flatMap 함수를 비교한다. map 함수정의map 함수는 RDD의 각 요소에 지정된 함수를 적용하고, 그 결과를 그대로 새로운 RDD의 요소로 사용한다. 특징입력 요소 하나당 출력 요소 하나 (1:1 매핑)입력과 출력 RDD의 요소 수가 동일함함수가 반환하는 값(단일 값, 리스트, 튜플 등)이 그대로 새 RDD의 요소가 됨예제rdd = sc.parallelize([1, 2, 3])# 단일 값 반환result1 = rdd.map(lambda x: x * 2)# 결과: [2, 4, 6]# 리스트 반환result2 = rdd.map(lambda x: [x, x * 2])# 결과: [[1, 2], [2, 4], [3, 6]]# 각 요소가 리스트로 래핑됨 flatMap 함수정의flat.. 2025. 2. 28.
pyspark countByValue 텍스트 파일에서 학점 데이터를 읽고, 각 학점별 발생 빈도를 계산하여 내림차순으로 정렬해 출력하는 예제 전체 코드import pysparksc = pyspark.SparkContext.getOrCreate()sample = "sample/grade.txt"data = sc.textFile(sample)grade = data.map(lambda line : line.split(" ")[1])#각 고유 값이 몇 번 출현하는지 계산하여 결과를 딕셔너리 형태로 반환count = grade.countByValue()# sorted(정렬할 데이터, key 파라미터, reverse 파라미터)for grade, cnt in sorted(count.items(), key=lambda x : x[1], reverse=Tr.. 2025. 2. 28.
pyspark word count 분석 PySpark를 사용한 워드 카운트(Word Count) 알고리즘의 구현과 동작 원리를 분석한다. 전체 코드import pysparktest_file = "hello.txt"sc = pyspark.SparkContext.getOrCreate()text_file = sc.textFile(test_file)counts = text_file.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b)print(counts.collect())hello.txt 파일hello worldhello worldhello 1. SparkContext 생성 .. 2025. 2. 26.
반응형