멀티프로세싱 vs 멀티스레딩
멀티스레딩은 하나의 프로세스 내에서 여러 실행 흐름을 만드는 방식으로, I/O-bound 작업(예: 파일 읽기, 웹 요청 등)에 적합하다. 하지만 Python은 GIL(Global Interpreter Lock)이라는 내부 구조 때문에, 동시에 여러 스레드가 CPU를 점유할 수 없다. 그래서 멀티스레딩으로 CPU-bound 작업(예: 수치 연산, 이미지 처리)을 처리하면 병렬화 효과가 거의 없다.
반면, 멀티프로세싱은 프로세스를 여러 개 생성하여 병렬로 실행하기 때문에, GIL의 영향을 받지 않으며 CPU를 효율적으로 활용할 수 있다. 각각의 프로세스는 독립적인 Python 인터프리터와 메모리 공간을 가지므로, 완전한 병렬처리가 가능하다.
멀티스레딩 vs 멀티프로세싱 시간 비교
import time
from threading import Thread
from multiprocessing import Process
def cpu_bound():
sum(x*x for x in range(10**6))
start = time.time()
threads = [Thread(target=cpu_bound) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print("[Threading]", time.time() - start)
start = time.time()
processes = [Process(target=cpu_bound) for _ in range(4)]
for p in processes: p.start()
for p in processes: p.join()
print("[Multiprocessing]", time.time() - start)
'''return
[Threading] 0.25305747985839844
[Multiprocessing] 0.10302376747131348
'''
기본 구조: multiprocessing.Process
multiprocessing 모듈의 Process 클래스를 사용하면, 독립적인 프로세스를 생성할 수 있다. 각각의 프로세스는 메인 프로세스와 별도로 실행되며, 병렬 작업에 활용할 수 있다.
from multiprocessing import Process
import os
def worker(name):
print(f"Worker {name} running in PID {os.getpid()}")
if __name__ == "__main__":
p1 = Process(target=worker, args=("A",))
p2 = Process(target=worker, args=("B",))
p1.start()
p2.start()
p1.join()
p2.join()
'''
Worker A running in PID 26692
Worker B running in PID 4776
'''
- start()를 호출하면 프로세스가 실행된다.
- join()은 해당 프로세스가 끝날 때까지 대기한다.
- 각 프로세스는 독립적인 PID를 가진다.
프로세스 간 통신 (IPC)
서로 다른 프로세스는 메모리를 공유하지 않기 때문에, 데이터를 주고받기 위해 IPC(Inter-Process Communication) 도구가 필요하다.
Queue 예제: 안전한 FIFO 통신
Queue를 활용하면 데이터를 안전하게 공유할 수 있으며, 병렬 작업 시 생산자-소비자 구조를 구현할 수 있다.
from multiprocessing import Process, Queue
def producer(q):
for i in range(5):
q.put(i) # 데이터를 큐에 넣는다
def consumer(q):
while not q.empty():
print("Consumed", q.get()) # 큐에서 데이터를 꺼낸다
if __name__ == "__main__":
q = Queue() # 프로세스 간 통신을 위한 큐 생성
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start(); p1.join()
p2.start(); p2.join()
'''
Consumed 0
Consumed 1
Consumed 2
Consumed 3
Consumed 4
'''
Pipe 예제: 1대1 통신 구조
Pipe는 두 프로세스 간에 생성된 두 연결 객체를 통해 1대1로 통신하는 구조로, 직접 메시지를 주고받는 데 적합하다. 구조가 간단하고 빠르지만, 1대1 통신에 특화되어 있기 때문에 다수의 소비자가 동시에 접근하는 구조에는 부적합하다.
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("Hello from child") # 데이터를 전송한다
conn.close() # 전송이 끝나면 연결을 닫는다
def receiver(conn):
print("Received:", conn.recv()) # 데이터를 수신한다
if __name__ == '__main__':
# Pipe()는 양쪽 끝을 나타내는 두 개의 연결 객체를 반환한다
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start(); p2.start()
p1.join(); p2.join()
'''
Received: Hello from child
'''
Pipe는 단순한 메시지 교환에 적합하며, 두 프로세스 사이에 직접적이고 빠른 통신이 필요한 경우 유용하다. 그러나 병렬로 여러 생산자나 소비자가 필요할 경우에는 Queue를 사용하는 것이 더 바람직하다.
Value와 Array 예제: 공유 메모리 객체
Value와 Array를 활용하면, 간단한 형태의 데이터를 여러 프로세스 간에 공유할 수 있다. 값이 메모리 공유 방식으로 처리된다.
from multiprocessing import Process, Value, Array
def modify(val, arr):
val.value += 1 # 공유 변수 값을 수정한다
for i in range(len(arr)):
arr[i] *= 2 # 배열의 각 원소를 2배로 만든다
if __name__ == '__main__':
val = Value('i', 10, lock=True) # 정수형 공유 변수
arr = Array('i', [1, 2, 3, 4], lock=True) # 정수형 배열 공유 변수
p = Process(target=modify, args=(val, arr))
p.start(); p.join()
print("Value:", val.value)
print("Array:", list(arr))
'''
Value: 11
Array: [2, 4, 6, 8]
'''
typecode_to_type에는 아래와 같은 값들이 들어갈 수 있다.
typecode_to_type = {
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong,
'f': ctypes.c_float, 'd': ctypes.c_double
}
Pool을 활용한 병렬 처리
Pool은 프로세스 풀을 미리 만들어 두고, 작업을 나누어 처리하는 방식이다. 주로 대량의 데이터를 처리할 때 효율적이다.
- map(): 입력 데이터를 병렬로 매핑하여 결과 리스트를 반환한다.
- apply_async(): 비동기 방식으로 작업을 요청한다.
Pool은 내부적으로 작업을 분할하여 병렬 처리하고, 작업이 끝나면 자동으로 자원을 반환한다.
from multiprocessing import Pool
import os
import time
# 워커 함수
def square(x):
time.sleep(0.5)
pid = os.getpid()
print(f"[PID {pid}] Processing {x}")
return x * x
if __name__ == "__main__":
with Pool(processes=4) as pool:
result = pool.map(square, range(10), chunksize=3)
print("최종 결과:", result)
'''
[PID 27244] Processing 0
[PID 13348] Processing 3
[PID 15304] Processing 9
[PID 21216] Processing 6
[PID 27244] Processing 1
[PID 21216] Processing 7
[PID 13348] Processing 4
[PID 27244] Processing 2
[PID 21216] Processing 8
[PID 13348] Processing 5
최종 결과: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
'''
프로세스 동기화
멀티프로세싱 환경에서 여러 프로세스가 동시에 동일한 자원에 접근할 경우, 데이터 충돌이나 레이스 컨디션이 발생할 수 있다. 이를 방지하기 위해 Python은 다음과 같은 동기화 도구를 제공한다.
Lock
Lock은 임계영역을 보호하여 동시에 하나의 프로세스만 접근하도록 제어한다.
from multiprocessing import Process, Value, Lock
def increment(count, lock=None):
for _ in range(100000):
if lock:
with lock:
count.value += 1
else:
count.value += 1
if __name__ == '__main__':
# 공유 변수
count = Value('i', 0)
# Lock 없이 테스트
p1 = Process(target=increment, args=(count,))
p2 = Process(target=increment, args=(count,))
p1.start(); p2.start()
p1.join(); p2.join()
print("Without Lock:", count.value) # 예상값보다 작게 나올 수 있음 (레이스 컨디션)
# 초기화
count = Value('i', 0)
lock = Lock()
# Lock 사용
p1 = Process(target=increment, args=(count, lock))
p2 = Process(target=increment, args=(count, lock))
p1.start(); p2.start()
p1.join(); p2.join()
print("With Lock:", count.value) # 정확히 200000
'''
Without Lock: 101448
With Lock: 200000
'''
Event
Event는 하나의 프로세스에서 신호를 보내고, 다른 프로세스가 이를 기다리는 구조다.
from multiprocessing import Process, Event
import time
import random
import os
def chef(order_placed: Event, food_ready: Event):
print(f"[Chef - PID {os.getpid()}] 주문 대기 중...")
order_placed.wait() # 주문 신호 올 때까지 대기
print(f"[Chef - PID {os.getpid()}] 요리 시작!")
cook_time = random.randint(1, 3)
time.sleep(cook_time)
print(f"[Chef - PID {os.getpid()}] 음식 완성! (조리시간: {cook_time}초)")
food_ready.set() # 음식 준비 완료 신호
def server(order_placed: Event, food_ready: Event):
print(f"[Server - PID {os.getpid()}] 주문 받는 중...")
time.sleep(random.randint(1, 2))
print(f"[Server - PID {os.getpid()}] 주문 완료, 주방에 전달")
order_placed.set() # 주문 신호 보내기
print(f"[Server - PID {os.getpid()}] 음식 나올 때까지 대기 중...")
food_ready.wait() # 음식 나올 때까지 대기
print(f"[Server - PID {os.getpid()}] 음식 서빙 중...")
time.sleep(1)
print(f"[Server - PID {os.getpid()}] 서빙 완료!")
if __name__ == '__main__':
from multiprocessing import set_start_method
set_start_method("spawn") # Windows/macOS 안전을 위해
# 프로세스 간 공유 가능한 이벤트 객체 생성
order_placed = Event()
food_ready = Event()
# 프로세스 생성
chef_proc = Process(target=chef, args=(order_placed, food_ready))
server_proc = Process(target=server, args=(order_placed, food_ready))
# 실행
chef_proc.start()
server_proc.start()
# 종료 대기
chef_proc.join()
server_proc.join()
print("레스토랑 영업 종료!")
'''
[Chef - PID 11676] 주문 대기 중...
[Server - PID 31776] 주문 받는 중...
[Server - PID 31776] 주문 완료, 주방에 전달
[Server - PID 31776] 음식 나올 때까지 대기 중...
[Chef - PID 11676] 요리 시작!
[Chef - PID 11676] 음식 완성! (조리시간: 2초)
[Server - PID 31776] 음식 서빙 중...
[Server - PID 31776] 서빙 완료!
레스토랑 영업 종료!
'''
Semaphore
Semaphore는 동시 접근 가능한 프로세스 수를 제한할 수 있다.
from multiprocessing import Process, Semaphore
import time
def limited_task(sema, i):
with sema:
print(f"Process {i}")
time.sleep(1)
if __name__ == '__main__':
sema = Semaphore(2) # 최대 2개 프로세스만 동시 진입 허용
processes = [Process(target=limited_task, args=(sema, i)) for i in range(5)]
for p in processes: p.start()
for p in processes: p.join()
예외 처리 및 종료 제어
멀티프로세싱에서 자식 프로세스에서 발생한 예외는 기본적으로 메인 프로세스에 전파되지 않는다.
프로세스 상태 확인 및 강제 종료
from multiprocessing import Process
import time
def long_task():
time.sleep(5)
if __name__ == '__main__':
p = Process(target=long_task)
p.start()
print("Is alive:", p.is_alive())
p.terminate()
p.join()
print("Exit code:", p.exitcode)
- is_alive(): 프로세스가 살아있는지 확인한다.
- terminate(): 프로세스를 강제로 종료한다.
- exitcode: 종료 상태를 확인한다. 0이면 정상, -15는 강제 종료 등.
tqdm 연동 예제
multiprocessing과 tqdm은 함께 사용하기 까다롭지만, imap() 또는 수동으로 업데이트를 통해 구현할 수 있다.
from multiprocessing import Pool
from tqdm import tqdm
def compute(x):
return x * x
if __name__ == '__main__':
with Pool(4) as pool:
# lazy iterator로 처리되는 만큼 tqdm이 하나씩 업데이트
results = list(tqdm(pool.imap(compute, range(10000)), total=10000))
print(results[:10])
- imap()이 처리 완료된 결과를 하나씩 보내주기 때문에, tqdm은 한 번씩 받을 때마다 update() 가능
- map()은 결과를 한꺼번에 반환하므로, tqdm이 끝날 때까지 아무 것도 못 그림
'python' 카테고리의 다른 글
파이썬 기초 - 비동기 프로그래밍 (0) | 2025.03.25 |
---|---|
파이썬 기초 - 멀티 스레드 (0) | 2025.03.25 |
파이썬 기초 - 유용한 자료구조 - collections와 enum (0) | 2025.03.23 |
파이썬 기초 - 메타 클래스와 추상 클래스 (0) | 2025.03.23 |
파이썬 기초 - 메서드 유형과 매직 메서드 (0) | 2025.03.22 |
댓글