aggregateByKey 함수는 RDD의 모든 값을 결합하여 하나의 결과값을 생성한다. 입력 값과 다른 타입의 결과를 생성할 수 있고, 복잡한 집계 연산을 효율적으로 수행할 수 있다.
핵심 기능
- 타입 변환: 입력 값(타입 V)을 결과 값(타입 U)으로 변환할 수 있음
- 메모리 효율성: 함수들이 첫 번째 인자를 수정하여 반환 가능 (새 객체 생성 불필요)
- 2단계 집계: 파티션 내부와 파티션 간의 두 단계 집계를 지원
기본 구문
rdd.aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None)
주요 매개변수
- zeroValue:
- 각 키의 집계를 위한 초기값
- 누적기(accumulator)의 시작 상태를 정의
- seqFunc (시퀀스 함수):
- 파티션 내에서 값들을 처리하는 함수
- 형식: (누적기, 값) → 새 누적기
- 누적기는 zeroValue와 같은 타입(U)
- 값은 RDD의 원래 값 타입(V)
- combFunc (결합 함수):
- 파티션 간에 결과를 결합하는 함수
- 형식: (누적기1, 누적기2) → 결합된 누적기
- 두 누적기 모두 zeroValue와 같은 타입(U)
- numPartitions (optional):
- 결과 RDD의 파티션 수
- 성능 최적화를 위해 사용됨
작동 원리
- 각 키에 대해 zeroValue로 초기화된 누적기가 생성됨
- 각 파티션 내에서:
- 동일한 키를 가진 각 값에 대해 seqFunc를 적용하여 누적기 업데이트
- 모든 파티션 처리 후:
- 동일한 키를 가진 모든 파티션의 누적기가 combFunc를 통해 결합됨
예제 코드
import pyspark
sc = pyspark.SparkContext.getOrCreate()
# 판매 데이터: (상품, 판매량, 가격)
sales_data = sc.parallelize([
("Apple", 100, 2.5),
("Banana", 150, 1.8),
("Orange", 80, 3.0),
("Apple", 120, 2.5),
("Banana", 200, 1.5),
("Mango", 50, 4.0),
("Orange", 70, 3.2),
("Apple", 90, 2.8)
])
# Key-Value 쌍 RDD로 변환: (상품, (판매량, 가격, 총액))
sales_kv = sales_data.map(lambda x : (x[0], (x[1], x[2], round(x[1] * x[2], 1))))
def seq_op(acc, value):
# acc: (거래 수, 총 판매량, 총 판매액, 최소 판매량, 최대 판매량)
# values: (판매량, 가격, 판매액)
count, total_qty, total_sales, min_qty, max_qty = acc # 사용자 정의 acc 초기값
qty, price, sales = value # sales_kv의 value
# 초기값 acc와 value를 연산
return (count + 1, total_qty + qty, total_sales + sales, min(min_qty, qty), max(max_qty, qty))
def comb_op(acc1, acc2):
count1, total_qty1, total_sales1, min_qty1, max_qty1 = acc1
count2, total_qty2, total_sales2, min_qty2, max_qty2 = acc2
#seq_op의 결과인 acc끼리 연산
return (
count1 + count2,
total_qty1 + total_qty2,
total_sales1 + total_sales2,
min(min_qty1, min_qty2),
max(max_qty1, max_qty2)
)
product_stats = sales_kv.aggregateByKey((0, 0, 0, float('inf'), float('-inf')), # 초기값
seq_op,
comb_op)
product_stats.sortByKey().collect()
코드 분석
시퀀스 연산 함수 (seq_op) 정의
- 파티션 내에서 각 키의 값을 처리하는 함수
- acc: 현재까지의 누적 결과 (사용자 정의 초기값 또는 이전 계산 결과)
- value: 현재 처리 중인 거래 데이터 (sales_kv의 value)
- 반환값: 업데이트된 누적기 튜플
- 거래 수 증가
- 판매량 합산
- 판매액 합산
- 최소 판매량 갱신
- 최대 판매량 갱신
결합 연산 함수 (comb_op) 정의
- 서로 다른 파티션에서 계산된 결과를 병합하는 함수
- acc1, acc2: seq_op 함수에서 반환된 누적기 튜플들
- 반환값: 두 누적기를 결합한 결과
- 거래 수 합산
- 총 판매량 합산
- 총 판매액 합산
- 전체 최소 판매량 결정
- 전체 최대 판매량 결정
aggregateByKey 연산
- 각 상품별로 통계를 집계
- 초기값: (0, 0, 0, float('inf'), float('-inf'))
- 거래 수 = 0
- 총 판매량 = 0
- 총 판매액 = 0
- 최소 판매량 = 양의 무한대 ( 첫 번째 비교 시 어떤 유한한 수라도 더 작게 된다.)
- 최대 판매량 = 음의 무한대 (첫 번째 비교 시 어떤 유한한 수라도 더 크게 된다.)
결과
[
('Apple', (3, 310, 802.0, 90, 120)),
('Banana', (2, 350, 570.0, 150, 200)),
('Mango', (1, 50, 200.0, 50, 50)),
('Orange', (2, 150, 464.0, 70, 80))
]
'pyspark' 카테고리의 다른 글
PySpark Key-Value 쌍 연산 (0) | 2025.03.01 |
---|---|
PySpark의 map과 flatMap 함수 비교 (0) | 2025.02.28 |
pyspark countByValue (0) | 2025.02.28 |
pyspark word count 분석 (0) | 2025.02.26 |
댓글