본문 바로가기
pyspark

pyspark aggregateByKey

by kyeongseo.oh 2025. 3. 3.

aggregateByKey 함수는 RDD의 모든 값을 결합하여 하나의 결과값을 생성한다. 입력 값과 다른 타입의 결과를 생성할 수 있고, 복잡한 집계 연산을 효율적으로 수행할 수 있다.

핵심 기능

  1. 타입 변환: 입력 값(타입 V)을 결과 값(타입 U)으로 변환할 수 있음
  2. 메모리 효율성: 함수들이 첫 번째 인자를 수정하여 반환 가능 (새 객체 생성 불필요)
  3. 2단계 집계: 파티션 내부와 파티션 간의 두 단계 집계를 지원

기본 구문

rdd.aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None)
  1.  

주요 매개변수

  1. zeroValue:
    • 각 키의 집계를 위한 초기값
    • 누적기(accumulator)의 시작 상태를 정의
  2. seqFunc (시퀀스 함수):
    • 파티션 내에서 값들을 처리하는 함수
    • 형식: (누적기, 값) → 새 누적기
    • 누적기는 zeroValue와 같은 타입(U)
    • 값은 RDD의 원래 값 타입(V)
  3. combFunc (결합 함수):
    • 파티션 간에 결과를 결합하는 함수
    • 형식: (누적기1, 누적기2) → 결합된 누적기
    • 두 누적기 모두 zeroValue와 같은 타입(U)
  4. numPartitions (optional):
    • 결과 RDD의 파티션 수
    • 성능 최적화를 위해 사용됨

작동 원리

  1. 각 키에 대해 zeroValue로 초기화된 누적기가 생성됨
  2. 각 파티션 내에서:
    • 동일한 키를 가진 각 값에 대해 seqFunc를 적용하여 누적기 업데이트
  3. 모든 파티션 처리 후:
    • 동일한 키를 가진 모든 파티션의 누적기가 combFunc를 통해 결합됨

예제 코드

import pyspark 

sc = pyspark.SparkContext.getOrCreate()

# 판매 데이터: (상품, 판매량, 가격)
sales_data = sc.parallelize([
    ("Apple", 100, 2.5),
    ("Banana", 150, 1.8),
    ("Orange", 80, 3.0),
    ("Apple", 120, 2.5),
    ("Banana", 200, 1.5),
    ("Mango", 50, 4.0),
    ("Orange", 70, 3.2),
    ("Apple", 90, 2.8)
])

# Key-Value 쌍 RDD로 변환: (상품, (판매량, 가격, 총액))
sales_kv = sales_data.map(lambda x : (x[0], (x[1], x[2], round(x[1] * x[2], 1))))

def seq_op(acc, value):
    # acc: (거래 수, 총 판매량, 총 판매액, 최소 판매량, 최대 판매량)
    # values: (판매량, 가격, 판매액)
    
    count, total_qty, total_sales, min_qty, max_qty = acc # 사용자 정의 acc 초기값
    qty, price, sales = value # sales_kv의 value
    
    # 초기값 acc와 value를 연산
    return (count + 1, total_qty + qty, total_sales + sales, min(min_qty, qty), max(max_qty, qty))

def comb_op(acc1, acc2):
    count1, total_qty1, total_sales1, min_qty1, max_qty1 = acc1
    count2, total_qty2, total_sales2, min_qty2, max_qty2 = acc2
    
    #seq_op의 결과인 acc끼리 연산
    return (
        count1 + count2, 
        total_qty1 + total_qty2, 
        total_sales1 + total_sales2, 
        min(min_qty1, min_qty2), 
        max(max_qty1, max_qty2)
    )

product_stats = sales_kv.aggregateByKey((0, 0, 0, float('inf'), float('-inf')), #  초기값
                                        seq_op, 
                                        comb_op)

product_stats.sortByKey().collect()

 

코드 분석

시퀀스 연산 함수 (seq_op) 정의

 

  • 파티션 내에서 각 키의 값을 처리하는 함수
  • acc: 현재까지의 누적 결과 (사용자 정의 초기값 또는 이전 계산 결과)
  • value: 현재 처리 중인 거래 데이터 (sales_kv의 value)
  • 반환값: 업데이트된 누적기 튜플
    • 거래 수 증가
    • 판매량 합산
    • 판매액 합산
    • 최소 판매량 갱신
    • 최대 판매량 갱신

결합 연산 함수 (comb_op) 정의

 

  • 서로 다른 파티션에서 계산된 결과를 병합하는 함수
  • acc1, acc2: seq_op 함수에서 반환된 누적기 튜플들
  • 반환값: 두 누적기를 결합한 결과
    • 거래 수 합산
    • 총 판매량 합산
    • 총 판매액 합산
    • 전체 최소 판매량 결정
    • 전체 최대 판매량 결정

aggregateByKey 연산

 

  • 각 상품별로 통계를 집계
  • 초기값: (0, 0, 0, float('inf'), float('-inf'))
    • 거래 수 = 0
    • 총 판매량 = 0
    • 총 판매액 = 0
    • 최소 판매량 = 양의 무한대 ( 첫 번째 비교 시 어떤 유한한 수라도 더 작게 된다.)
    • 최대 판매량 = 음의 무한대 (첫 번째 비교 시 어떤 유한한 수라도 더 크게 된다.)

결과

[
  ('Apple', (3, 310, 802.0, 90, 120)),
  ('Banana', (2, 350, 570.0, 150, 200)),
  ('Mango', (1, 50, 200.0, 50, 50)),
  ('Orange', (2, 150, 464.0, 70, 80))
]

 

 

 

'pyspark' 카테고리의 다른 글

PySpark Key-Value 쌍 연산  (0) 2025.03.01
PySpark의 map과 flatMap 함수 비교  (0) 2025.02.28
pyspark countByValue  (0) 2025.02.28
pyspark word count 분석  (0) 2025.02.26

댓글