1. 기본 설정 및 데이터 로드
from pyspark import SparkContext
sc = SparkContext("local", "PySpark Sales Data Examples")
# 판매 데이터: (상품, 판매량, 가격)
sales_data = sc.parallelize([
("Apple", 100, 2.5),
("Banana", 150, 1.8),
("Orange", 80, 3.0),
("Apple", 120, 2.5),
("Banana", 200, 1.5),
("Mango", 50, 4.0),
("Orange", 70, 3.2),
("Apple", 90, 2.8)
])
# Key-Value 쌍 RDD로 변환: (상품, (판매량, 가격, 총액))
sales_kv = sales_data.map(lambda x: (x[0], (x[1], x[2], x[1] * x[2])))
## 결과
[('Apple', (100, 2.5, 250.0)),
('Banana', (150, 1.8, 270.0)),
('Orange', (80, 3.0, 240.0)),
('Apple', (120, 2.5, 300.0)),
('Banana', (200, 1.5, 300.0)),
('Mango', (50, 4.0, 200.0)),
('Orange', (70, 3.2, 224.0)),
('Apple', (90, 2.8, 252.0))]
2. 상품별 평균 판매량 계산
avg_quantity = sales_kv.mapValues(lambda x : (x[0], 1))\
.reduceByKey(lambda a, b : (a[0] + b[0], a[1] + b[1]))\
.mapValues(lambda x : x[0] / x[1])
# 결과
[('Orange', 75.0),
('Apple', 103.33333333333333),
('Banana', 175.0),
('Mango', 50.0)]
평균 계산 단계별 설명:
- mapValues: 각 상품의 판매량을 (판매량, 1) 튜플로 변환
- 입력: ('Apple', (100, 2.5, 250.0))
- 출력: ('Apple', (100, 1))
- reduceByKey: 같은 상품의 (판매량 합계, 개수) 계산
- 입력: [('Apple', (100, 1)), ('Apple', (120, 1)), ('Apple', (90, 1))]
- 출력: ('Apple', (310, 3))
- mapValues: 총 판매량을 개수로 나누어 평균 계산
- 입력: ('Apple', (310, 3))
- 출력: ('Apple', 103.33)
3. 상품별 총 판매액 계산
total_sales = sales_kv.mapValues(lambda x : x[2]).reduceByKey(lambda a, b : a + b)
# 결과
[('Orange', 464.0), ('Apple', 802.0), ('Banana', 570.0), ('Mango', 200.0)]
4. 필터링 연산 예제
4.1 판매량이 100개 이상인 거래 필터링
high_volume_sales = sales_kv.filter(lambda x : x[1][0] >= 100)
# 결과
[('Apple', (100, 2.5, 250.0)),
('Banana', (150, 1.8, 270.0)),
('Apple', (120, 2.5, 300.0)),
('Banana', (200, 1.5, 300.0))]
4.2 특정 상품('Apple')의 판매 기록 필터링
apple_sales = sales_kv.filter(lambda x : x[0] == "Apple")
# 결과
[('Apple', (100, 2.5, 250.0)),
('Apple', (120, 2.5, 300.0)),
('Apple', (90, 2.8, 252.0))]
4.3 판매액이 $300 이상인 거래 필터링
sales_with_total = sales_kv.filter(lambda x : x[1][2] >= 300)
# 결과
[('Apple', (120, 2.5, 300.0)), ('Banana', (200, 1.5, 300.0))]
5. 최소/최대 판매량 계산
5.1 상품별 최소 판매량
min_quantity = sales_kv.mapValues(lambda x : x[0]).reduceByKey(lambda a ,b : min(a, b))
# 결과
[('Orange', 70), ('Apple', 90), ('Banana', 150), ('Mango', 50)]
5.2 상품별 최대 판매량
max_quantity = sales_kv.mapValues(lambda x : x[0]).reduceByKey(lambda a ,b : max(a, b))
# 결과
[('Orange', 80), ('Apple', 120), ('Banana', 200), ('Mango', 50)]
5.3 전체 데이터에서 가장 높은 판매액 거래
Apple과 Banana 모두 판매액이 300.0일 때 Key 기준으로 오름차순이 적용되어, Apple이 출력된다.
highest_sale = sales_kv.max(lambda x : x[1][2])
# 결과 (예시)
('Apple', (120, 2.5, 300.0))
6. 날짜별 그룹핑
# 날짜 정보가 있는 데이터셋
sales_with_date = sc.parallelize([
("Apple", "2023-01-01", 100, 2.5),
("Banana", "2023-01-01", 150, 1.8),
("Apple", "2023-01-02", 120, 2.5),
("Banana", "2023-01-02", 200, 1.5),
("Apple", "2023-01-03", 90, 2.8),
("Banana", "2023-01-03", 180, 1.6),
("Apple", "2023-01-03", 110, 2.6),
("Banana", "2023-01-03", 195, 1.7)
])
sales_by_date = sales_with_date.map(lambda x: ((x[0], x[1]), x[2]))
daily_sales = sales_by_date.reduceByKey(lambda a, b : a + b)
sales_by_day = daily_sales.map(lambda x: (x[0][1], (x[0][0], x[1]))).groupByKey().mapValues(list).sortByKey()
# 결과
# 2023-01-01: [('Apple', 100), ('Banana', 150)]
# 2023-01-02: [('Apple', 120), ('Banana', 200)]
# ...
단계별 설명:
- map: 복합 키 구성
- 원본 데이터에서 (상품, 날짜)를 키로, 판매량을 값으로 추출
- 입력: ("Apple", "2023-01-01", 100, 2.5)
- 출력: (("Apple", "2023-01-01"), 100)
- reduceByKey: 같은 날짜, 같은 상품의 판매량 합산
- 동일한 (상품, 날짜) 조합에 대한 판매량을 합산
- 입력: [(("Apple", "2023-01-03"), 90), (("Apple", "2023-01-03"), 110)]
- 출력: (("Apple", "2023-01-03"), 200)
- map: 키 재구성 (날짜 중심)
- 키를 날짜로 변경하고, 값을 (상품, 판매량) 형태로 재구성
- 입력: (("Apple", "2023-01-03"), 200)
- 출력: ("2023-01-03", ("Apple", 200))
- groupByKey: 날짜별 그룹화
- 날짜를 기준으로 모든 상품 판매량 정보를 그룹화
- 입력: [("2023-01-03", ("Apple", 200)), ("2023-01-03", ("Banana", 375))]
- 출력: ("2023-01-03", <iterator>)
- mapValues: 이터레이터를 리스트로 변환
- 그룹화된 이터레이터 값을 구체적인 리스트로 변환
- 입력: ("2023-01-03", <iterator>)
- 출력: ("2023-01-03", [("Apple", 200), ("Banana", 375)])
- sortByKey: 날짜순 정렬
- 날짜 키를 기준으로 오름차순 정렬
7. Key-Value 쌍 RDD 주요 작업 비교표
작업 | 설명 | 예시 |
mapValues | 값만 변환 (키는 유지) | rdd.mapValues(lambda x: x * 2) |
reduceByKey | 같은 키를 가진 값들을 하나로 병합 | rdd.reduceByKey(lambda a, b: a + b) |
groupByKey | 같은 키를 가진 값들을 그룹화 | rdd.groupByKey() |
sortByKey | 키를 기준으로 정렬 | rdd.sortByKey() |
countByKey | 각 키별 요소 개수 계산 | rdd.countByKey() |
'pyspark' 카테고리의 다른 글
pyspark aggregateByKey (0) | 2025.03.03 |
---|---|
PySpark의 map과 flatMap 함수 비교 (0) | 2025.02.28 |
pyspark countByValue (0) | 2025.02.28 |
pyspark word count 분석 (0) | 2025.02.26 |
댓글