본문 바로가기
python

파이썬 기초 - 멀티 프로세싱

by kyeongseo.oh 2025. 3. 25.

멀티프로세싱 vs 멀티스레딩

멀티스레딩은 하나의 프로세스 내에서 여러 실행 흐름을 만드는 방식으로, I/O-bound 작업(예: 파일 읽기, 웹 요청 등)에 적합하다. 하지만 Python은 GIL(Global Interpreter Lock)이라는 내부 구조 때문에, 동시에 여러 스레드가 CPU를 점유할 수 없다. 그래서 멀티스레딩으로 CPU-bound 작업(예: 수치 연산, 이미지 처리)을 처리하면 병렬화 효과가 거의 없다.

반면, 멀티프로세싱은 프로세스를 여러 개 생성하여 병렬로 실행하기 때문에, GIL의 영향을 받지 않으며 CPU를 효율적으로 활용할 수 있다. 각각의 프로세스는 독립적인 Python 인터프리터와 메모리 공간을 가지므로, 완전한 병렬처리가 가능하다.

멀티스레딩 vs 멀티프로세싱 시간 비교

import time
from threading import Thread
from multiprocessing import Process

def cpu_bound():
    sum(x*x for x in range(10**6))

start = time.time()
threads = [Thread(target=cpu_bound) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print("[Threading]", time.time() - start)

start = time.time()
processes = [Process(target=cpu_bound) for _ in range(4)]
for p in processes: p.start()
for p in processes: p.join()
print("[Multiprocessing]", time.time() - start)

'''return
[Threading] 0.25305747985839844
[Multiprocessing] 0.10302376747131348
'''

기본 구조: multiprocessing.Process

multiprocessing 모듈의 Process 클래스를 사용하면, 독립적인 프로세스를 생성할 수 있다. 각각의 프로세스는 메인 프로세스와 별도로 실행되며, 병렬 작업에 활용할 수 있다.

from multiprocessing import Process
import os

def worker(name):
    print(f"Worker {name} running in PID {os.getpid()}")

if __name__ == "__main__":
    p1 = Process(target=worker, args=("A",))
    p2 = Process(target=worker, args=("B",))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
'''
Worker A running in PID 26692
Worker B running in PID 4776
'''
  • start()를 호출하면 프로세스가 실행된다.
  • join()은 해당 프로세스가 끝날 때까지 대기한다.
  • 각 프로세스는 독립적인 PID를 가진다.

프로세스 간 통신 (IPC)

서로 다른 프로세스는 메모리를 공유하지 않기 때문에, 데이터를 주고받기 위해 IPC(Inter-Process Communication) 도구가 필요하다.

Queue 예제: 안전한 FIFO 통신

Queue를 활용하면 데이터를 안전하게 공유할 수 있으며, 병렬 작업 시 생산자-소비자 구조를 구현할 수 있다.

from multiprocessing import Process, Queue

def producer(q):
    for i in range(5):
        q.put(i)  # 데이터를 큐에 넣는다

def consumer(q):
    while not q.empty():
        print("Consumed", q.get())  # 큐에서 데이터를 꺼낸다

if __name__ == "__main__":
    q = Queue()  # 프로세스 간 통신을 위한 큐 생성
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    p1.start(); p1.join()
    p2.start(); p2.join()

'''
Consumed 0
Consumed 1
Consumed 2
Consumed 3
Consumed 4
'''

 

Pipe 예제: 1대1 통신 구조

Pipe는 두 프로세스 간에 생성된 두 연결 객체를 통해 1대1로 통신하는 구조로, 직접 메시지를 주고받는 데 적합하다. 구조가 간단하고 빠르지만, 1대1 통신에 특화되어 있기 때문에 다수의 소비자가 동시에 접근하는 구조에는 부적합하다.

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello from child")  # 데이터를 전송한다
    conn.close()  # 전송이 끝나면 연결을 닫는다

def receiver(conn):
    print("Received:", conn.recv())  # 데이터를 수신한다

if __name__ == '__main__':
    # Pipe()는 양쪽 끝을 나타내는 두 개의 연결 객체를 반환한다
    parent_conn, child_conn = Pipe()
    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))
    p1.start(); p2.start()
    p1.join(); p2.join()

'''
Received: Hello from child
'''

Pipe는 단순한 메시지 교환에 적합하며, 두 프로세스 사이에 직접적이고 빠른 통신이 필요한 경우 유용하다. 그러나 병렬로 여러 생산자나 소비자가 필요할 경우에는 Queue를 사용하는 것이 더 바람직하다.

Value와 Array 예제: 공유 메모리 객체

Value와 Array를 활용하면, 간단한 형태의 데이터를 여러 프로세스 간에 공유할 수 있다. 값이 메모리 공유 방식으로 처리된다.

from multiprocessing import Process, Value, Array

def modify(val, arr):
    val.value += 1  # 공유 변수 값을 수정한다
    for i in range(len(arr)):
        arr[i] *= 2  # 배열의 각 원소를 2배로 만든다

if __name__ == '__main__':
    val = Value('i', 10, lock=True)      # 정수형 공유 변수
    arr = Array('i', [1, 2, 3, 4], lock=True)  # 정수형 배열 공유 변수
    p = Process(target=modify, args=(val, arr))
    p.start(); p.join()
    print("Value:", val.value)
    print("Array:", list(arr))
    
'''
Value: 11
Array: [2, 4, 6, 8]
'''

 

 

typecode_to_type에는 아래와 같은 값들이 들어갈 수 있다.

typecode_to_type = {
    'c': ctypes.c_char,     'u': ctypes.c_wchar,
    'b': ctypes.c_byte,     'B': ctypes.c_ubyte,
    'h': ctypes.c_short,    'H': ctypes.c_ushort,
    'i': ctypes.c_int,      'I': ctypes.c_uint,
    'l': ctypes.c_long,     'L': ctypes.c_ulong,
    'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong,
    'f': ctypes.c_float,    'd': ctypes.c_double
    }

Pool을 활용한 병렬 처리

Pool은 프로세스 풀을 미리 만들어 두고, 작업을 나누어 처리하는 방식이다. 주로 대량의 데이터를 처리할 때 효율적이다.

  • map(): 입력 데이터를 병렬로 매핑하여 결과 리스트를 반환한다.
  • apply_async(): 비동기 방식으로 작업을 요청한다.

Pool은 내부적으로 작업을 분할하여 병렬 처리하고, 작업이 끝나면 자동으로 자원을 반환한다.

from multiprocessing import Pool
import os
import time

# 워커 함수
def square(x):
    time.sleep(0.5)
    pid = os.getpid()
    print(f"[PID {pid}] Processing {x}")
    return x * x

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        result = pool.map(square, range(10), chunksize=3)
        print("최종 결과:", result)
        
'''
[PID 27244] Processing 0
[PID 13348] Processing 3
[PID 15304] Processing 9
[PID 21216] Processing 6
[PID 27244] Processing 1
[PID 21216] Processing 7
[PID 13348] Processing 4
[PID 27244] Processing 2
[PID 21216] Processing 8
[PID 13348] Processing 5
최종 결과: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
'''

 

프로세스 동기화

멀티프로세싱 환경에서 여러 프로세스가 동시에 동일한 자원에 접근할 경우, 데이터 충돌이나 레이스 컨디션이 발생할 수 있다. 이를 방지하기 위해 Python은 다음과 같은 동기화 도구를 제공한다.

Lock

Lock은 임계영역을 보호하여 동시에 하나의 프로세스만 접근하도록 제어한다.

from multiprocessing import Process, Value, Lock

def increment(count, lock=None):
    for _ in range(100000):
        if lock:
            with lock:
                count.value += 1
        else:
            count.value += 1

if __name__ == '__main__':
    # 공유 변수
    count = Value('i', 0)

    # Lock 없이 테스트
    p1 = Process(target=increment, args=(count,))
    p2 = Process(target=increment, args=(count,))
    
    p1.start(); p2.start()
    p1.join(); p2.join()
    
    print("Without Lock:", count.value)  # 예상값보다 작게 나올 수 있음 (레이스 컨디션)

    # 초기화
    count = Value('i', 0)
    lock = Lock()

    # Lock 사용
    p1 = Process(target=increment, args=(count, lock))
    p2 = Process(target=increment, args=(count, lock))
    
    p1.start(); p2.start()
    p1.join(); p2.join()

    print("With Lock:", count.value)  # 정확히 200000
    
'''
Without Lock: 101448
With Lock: 200000
'''

 

Event

Event는 하나의 프로세스에서 신호를 보내고, 다른 프로세스가 이를 기다리는 구조다.

from multiprocessing import Process, Event
import time
import random
import os

def chef(order_placed: Event, food_ready: Event):
    print(f"[Chef - PID {os.getpid()}] 주문 대기 중...")
    order_placed.wait()  # 주문 신호 올 때까지 대기

    print(f"[Chef - PID {os.getpid()}] 요리 시작!")
    cook_time = random.randint(1, 3)
    time.sleep(cook_time)
    print(f"[Chef - PID {os.getpid()}] 음식 완성! (조리시간: {cook_time}초)")

    food_ready.set()  # 음식 준비 완료 신호

def server(order_placed: Event, food_ready: Event):
    print(f"[Server - PID {os.getpid()}] 주문 받는 중...")
    time.sleep(random.randint(1, 2))
    print(f"[Server - PID {os.getpid()}] 주문 완료, 주방에 전달")

    order_placed.set()  # 주문 신호 보내기

    print(f"[Server - PID {os.getpid()}] 음식 나올 때까지 대기 중...")
    food_ready.wait()  # 음식 나올 때까지 대기

    print(f"[Server - PID {os.getpid()}] 음식 서빙 중...")
    time.sleep(1)
    print(f"[Server - PID {os.getpid()}] 서빙 완료!")

if __name__ == '__main__':
    from multiprocessing import set_start_method
    set_start_method("spawn")  # Windows/macOS 안전을 위해

    # 프로세스 간 공유 가능한 이벤트 객체 생성
    order_placed = Event()
    food_ready = Event()

    # 프로세스 생성
    chef_proc = Process(target=chef, args=(order_placed, food_ready))
    server_proc = Process(target=server, args=(order_placed, food_ready))

    # 실행
    chef_proc.start()
    server_proc.start()

    # 종료 대기
    chef_proc.join()
    server_proc.join()

    print("레스토랑 영업 종료!")
    
'''
[Chef - PID 11676] 주문 대기 중...
[Server - PID 31776] 주문 받는 중...
[Server - PID 31776] 주문 완료, 주방에 전달
[Server - PID 31776] 음식 나올 때까지 대기 중...
[Chef - PID 11676] 요리 시작!
[Chef - PID 11676] 음식 완성! (조리시간: 2초)
[Server - PID 31776] 음식 서빙 중...
[Server - PID 31776] 서빙 완료!
레스토랑 영업 종료!
'''

 

Semaphore

Semaphore는 동시 접근 가능한 프로세스 수를 제한할 수 있다.

from multiprocessing import Process, Semaphore
import time

def limited_task(sema, i):
    with sema:
        print(f"Process {i}")
        time.sleep(1)

if __name__ == '__main__':
    sema = Semaphore(2)  # 최대 2개 프로세스만 동시 진입 허용
    processes = [Process(target=limited_task, args=(sema, i)) for i in range(5)]
    for p in processes: p.start()
    for p in processes: p.join()

 

예외 처리 및 종료 제어

멀티프로세싱에서 자식 프로세스에서 발생한 예외는 기본적으로 메인 프로세스에 전파되지 않는다.

프로세스 상태 확인 및 강제 종료

from multiprocessing import Process
import time

def long_task():
    time.sleep(5)

if __name__ == '__main__':
    p = Process(target=long_task)
    p.start()
    print("Is alive:", p.is_alive())
    p.terminate()
    p.join()
    print("Exit code:", p.exitcode)
  • is_alive(): 프로세스가 살아있는지 확인한다.
  • terminate(): 프로세스를 강제로 종료한다.
  • exitcode: 종료 상태를 확인한다. 0이면 정상, -15는 강제 종료 등.

tqdm 연동 예제

multiprocessing과 tqdm은 함께 사용하기 까다롭지만, imap() 또는 수동으로 업데이트를 통해 구현할 수 있다.

from multiprocessing import Pool
from tqdm import tqdm

def compute(x):
    return x * x

if __name__ == '__main__':
    with Pool(4) as pool:
        # lazy iterator로 처리되는 만큼 tqdm이 하나씩 업데이트
        results = list(tqdm(pool.imap(compute, range(10000)), total=10000))
    print(results[:10])
  • imap()이 처리 완료된 결과를 하나씩 보내주기 때문에, tqdm은 한 번씩 받을 때마다 update() 가능
  • map()은 결과를 한꺼번에 반환하므로, tqdm이 끝날 때까지 아무 것도 못 그림

댓글