본문 바로가기
python

파이썬 기초 - 멀티 스레드

by kyeongseo.oh 2025. 3. 25.

 스레드

스레드는 하나의 프로그램(프로세스) 안에서 실행되는 작업의 흐름 단위이다. 스레드는 같은 메모리 공간을 공유한다.

 CPU-bound vs I/O-bound

작업 종류 예시 특징 멀티스레드 적합성
CPU-bound 수학 계산, 이미지 필터링 CPU 연산 중심 GIL 때문에 효과 적음
I/O-bound 파일 읽기, 웹 요청, DB 쿼리 기다림 중심 멀티쓰레딩 효과 큼

I/O-bound에서 멀티쓰레딩이 유리한 이유

  • Python은 GIL(Global Interpreter Lock) 때문에 한 번에 하나의 스레드만 실행됨
  • 하지만 I/O 작업 중엔 CPU를 기다림 상태로 놔두기 때문에, 다른 스레드가 실행 가능
  • GIL의 제약이 있어도 I/O 작업에선 병렬성 효과가 나타남

기본 스레드 생성 예제

import threading
import time

def worker(name):
    print(f"{name} 시작")
    time.sleep(2)  # 2초 대기 (I/O작업 모방)
    print(f"{name} 종료")

t1 = threading.Thread(target=worker, args=("스레드A",))
t2 = threading.Thread(target=worker, args=("스레드B",))

t1.start()
t2.start()

t1.join()
t2.join()

print("모든 스레드 완료!")

'''result
스레드A 시작
스레드B 시작
스레드A 종료
스레드B 종료
모든 스레드 완료!
'''
  • 두 쓰레드가 동시에 실행되어 실행 시간 단축

주요 메서드 설명

메서드기능
start() 스레드를 시작(실행)
join() 스레드 종료될 때까지 기다림
is_alive() 스레드가 현재 실행 중인지 확인

동기화(Synchronization)가 필요한 이유

여러 스레드가 동시에 공유 데이터에 접근할 때 문제가 생길 수 있다.

  • 이를 경쟁 조건(Race Condition) 이라 부른다.
  • 공유 데이터를 보호하기 위한 장치가 동기화 도구이다.

동기화 도구 사용법과 효과 비교하기

아래 예제는 Lock을 이용하여 동기화가 있을 때와 없을 때의 차이를 보여준다.

동기화를 하지 않은 경우

import threading
import time

counter = 0

def increase():
    global counter
    for _ in range(10):
        temp = counter
        time.sleep(0.01)  # 레이스 컨디션을 더 잘 발생시키기 위한 지연
        counter = temp + 1

threads = [threading.Thread(target=increase) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"최종 카운터 값 (동기화X): {counter}")
  • 이유는 여러 스레드가 동시에 같은 데이터를 변경하면서 충돌하기 때문에 기댓값인 50이 아닌 10이 출력된다.

동기화를 사용한 경우 (Lock 이용)

import threading
import time

counter = 0
lock = threading.Lock()

def increase():
    global counter
    for _ in range(10):
        lock.acquire()    # 잠금 설정
        temp = counter
        time.sleep(0.01)  # 레이스 컨디션을 더 잘 발생시키기 위한 지연
        counter = temp + 1
        lock.release()    # 잠금 해제

threads = [threading.Thread(target=increase) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"최종 카운터 값 (동기화O): {counter}")
  • Lock을 통해 한 번에 하나의 스레드만 접근할 수 있도록 보호했기 떄문에 기댓값인 50이 출력된다.

acquire()와 release()

  • acquire()
    Lock(잠금)을 획득하여, 다른 스레드의 접근을 차단한다.
    이미 다른 스레드가 Lock을 잡고 있다면, Lock을 해제할 때까지 대기한다.
  • release()
    작업이 끝난 후 Lock을 풀어, 다른 스레드가 접근할 수 있도록 한다.

편리한 방법: with 문 활용하기

with lock:
    # 자동으로 acquire() 되고, 끝나면 release() 된다.
    temp = counter
    time.sleep(0.01) 
    counter = temp + 1

Semaphore와 Event

Semaphore (세마포어)

세마포어는 여러 스레드가 공유 자원에 접근할 때 동시 접근 수를 제어한다.

이 코드를 실행하면 아래와 같이 동작한다.

  1. 5개의 스레드가 생성된다.
  2. 세마포어가 최대 3개의 스레드만 동시에 실행되도록 제한한다.
  3. 처음 3개의 스레드가 즉시 진입하여 실행된다.
  4. 나머지 2개의 스레드는 이전 스레드 중 하나가 완료될 때까지 대기한다.
  5. 처음 3개의 스레드가 완료되면, 대기 중이던 2개의 스레드가 차례로 실행된다.
import threading
import time

semaphore = threading.Semaphore(3)  # 최대 3개 스레드 동시 접근 허용

def task(name):
    with semaphore:
        print(f"{name} 진입 \n")
        time.sleep(1)
        print(f"{name} 퇴장 \n")

# 5개의 스레드 생성 및 실행
threads = []
for i in range(5):
    t = threading.Thread(target=task, args=(f"스레드 {i}",))
    threads.append(t)
    t.start()

# 모든 스레드가 완료될 때까지 대기
for t in threads:
    t.join()

print("모든 작업 완료")

'''result
스레드 0 진입 
스레드 1 진입 
스레드 2 진입 
스레드 0 퇴장 
스레드 2 퇴장 
스레드 1 퇴장 
스레드 3 진입 
스레드 4 진입 
스레드 4 퇴장 
스레드 3 퇴장 
모든 작업 완료
'''

Event (이벤트)

Event는 기본적으로 "신호" 역할을 하며, 하나의 스레드가 다른 스레드에게 어떤 이벤트가 발생했음을 알릴 때 사용할 수 있다. 스레드 간 신호를 주고받아 작업의 순서를 유연하게 관리할 때 사용한다.

간단 예제

  1. 두 개의 Event 객체가 사용된다.
    • order_placed: 주문이 들어왔음을 알리는 신호
    • food_ready: 음식이 준비되었음을 알리는 신호
  2. 요리사(chef) 스레드:
    • order_placed.wait()로 주문이 들어올 때까지 대기한다.
    • 주문 신호를 받으면 요리를 시작하고 1~3초 동안 요리한다.
    • 요리가 완성되면 food_ready.set()으로 음식 준비 완료 신호를 보낸다.
  3. 서빙직원(server) 스레드:
    • 주문을 받는 데 1~2초가 소요된다.
    • 주문을 받으면 order_placed.set()으로 요리사에게 주문 신호를 보낸다.
    • food_ready.wait()로 음식이 준비될 때까지 대기한다.
    • 신호를 받으면 서빙을 시작하고 1초 후에 서빙을 완료한다.
  4. 메인 스레드는 join() 메서드로 두 스레드가 모두 완료될 때까지 기다린 후 "레스토랑 영업 종료!" 메시지를 출력한다.
import threading
import time
import random

# 이벤트 객체 생성
food_ready = threading.Event() # 음식이 준비되었음을 알리는 신호
order_placed = threading.Event() # 주문이 들어왔음을 알리는 신호

def chef():
    # 주문이 들어올 때까지 대기
    print("요리사: 주문 대기 중...")
    order_placed.wait()
    
    # 요리 시작
    print("요리사: 요리 시작!")
    cook_time = random.randint(1, 3)
    time.sleep(cook_time)
    print(f"요리사: 음식 완성! (조리시간: {cook_time}초)")
    
    # 음식 준비 완료 신호 발송
    food_ready.set()

def server():
    # 주문 받기
    print("서빙직원: 주문 받는 중...")
    time.sleep(random.randint(1, 2))
    print("서빙직원: 주문 완료, 주방에 전달")
    
    # 주문 신호 보내기
    order_placed.set()
    
    # 음식 준비 완료까지 대기
    print("서빙직원: 음식 나올 때까지 대기 중...")
    food_ready.wait()
    
    # 서빙
    print("서빙직원: 음식 서빙 중...")
    time.sleep(1)
    print("서빙직원: 서빙 완료!")

# 스레드 생성 및 시작
chef_thread = threading.Thread(target=chef)
server_thread = threading.Thread(target=server)

chef_thread.start()
server_thread.start()

# 스레드 종료 대기
chef_thread.join()
server_thread.join()

print("레스토랑 영업 종료!")

'''result
요리사: 주문 대기 중...
서빙직원: 주문 받는 중...
서빙직원: 주문 완료, 주방에 전달
서빙직원: 음식 나올 때까지 대기 중...
요리사: 요리 시작!
요리사: 음식 완성! (조리시간: 1초)
서빙직원: 음식 서빙 중...
서빙직원: 서빙 완료!
레스토랑 영업 종료!
'''

싱글 vs 멀티 스레드 성능 비교

import threading
import time

def io_task():
    time.sleep(1)  # I/O 작업 모방

# 멀티스레드 (빠름)
start = time.time()
threads = [threading.Thread(target=io_task) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"멀티스레드 시간: {time.time() - start:.2f}초")

# 싱글스레드 (느림)
start = time.time()
for _ in range(5):
    io_task()
print(f"싱글스레드 시간: {time.time() - start:.2f}초")

'''result
멀티스레드 시간: 1.01초
싱글스레드 시간: 5.05초
'''
 
 

 

댓글