Skip to main content

스레딩과 멀티프로세싱

학습 목표

  • GIL(Global Interpreter Lock)의 개념과 영향을 이해할 수 있다
  • threading으로 I/O 바운드 작업의 동시성을 구현할 수 있다
  • multiprocessing으로 CPU 바운드 작업의 병렬 처리를 구현할 수 있다
  • concurrent.futures로 간편한 병렬 실행을 할 수 있다

왜 중요한가

ML/DL 워크로드에서 데이터 전처리는 CPU 집약적이고, API 호출이나 파일 다운로드는 I/O 집약적입니다. Python의 GIL 때문에 멀티스레딩으로는 CPU 병렬화가 불가능하지만, 적절한 도구를 선택하면 작업 시간을 크게 단축할 수 있습니다.

GIL (Global Interpreter Lock)

GIL은 한 번에 하나의 스레드만 Python 바이트코드를 실행하도록 제한하는 뮤텍스입니다.
작업 유형설명GIL 영향해결책
CPU 바운드수학 연산, 데이터 변환심각 (병렬화 불가)multiprocessing
I/O 바운드파일, 네트워크, API적음 (I/O 중 GIL 해제)threading
Python 3.13부터 실험적으로 GIL을 비활성화할 수 있는 “free-threaded” 모드가 도입되었습니다. 하지만 아직 실험 단계이므로 프로덕션에서는 기존 방법을 사용하세요.

threading - I/O 바운드 동시 처리

import threading
import time

# 기본 스레드 생성
def download_file(url: str) -> None:
    """파일 다운로드를 시뮬레이션합니다."""
    print(f"다운로드 시작: {url}")
    time.sleep(2)  # I/O 대기 시뮬레이션
    print(f"다운로드 완료: {url}")

urls = [
    "https://data.example.com/train.csv",
    "https://data.example.com/val.csv",
    "https://data.example.com/test.csv",
]

# 순차 실행: 약 6초
start = time.time()
for url in urls:
    download_file(url)
print(f"순차 실행: {time.time() - start:.1f}초")

# 병렬 실행: 약 2초
start = time.time()
threads = []
for url in urls:
    t = threading.Thread(target=download_file, args=(url,))
    threads.append(t)
    t.start()

# 모든 스레드 완료 대기
for t in threads:
    t.join()
print(f"스레드 실행: {time.time() - start:.1f}초")

스레드 안전과 Lock

import threading

# 공유 자원 접근 시 Lock 필요
counter = 0
lock = threading.Lock()

def increment(n: int) -> None:
    global counter
    for _ in range(n):
        with lock:  # Lock 획득 → 해제 자동 관리
            counter += 1

threads = [
    threading.Thread(target=increment, args=(100000,))
    for _ in range(4)
]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"결과: {counter}")  # 400000 (Lock 없으면 불정확)
Lock 없이 여러 스레드가 같은 변수를 수정하면 경쟁 조건(Race Condition)이 발생합니다. 공유 자원에 접근할 때는 반드시 Lock을 사용하세요.

multiprocessing - CPU 바운드 병렬 처리

import multiprocessing
import time
import math

# CPU 집약적 작업
def heavy_computation(n: int) -> float:
    """무거운 수학 연산을 수행합니다."""
    return sum(math.sqrt(i) for i in range(n))

numbers = [10_000_000] * 4

# 순차 실행
start = time.time()
results_seq = [heavy_computation(n) for n in numbers]
print(f"순차 실행: {time.time() - start:.2f}초")

# 멀티프로세싱 실행
if __name__ == "__main__":
    start = time.time()
    with multiprocessing.Pool(processes=4) as pool:
        results_mp = pool.map(heavy_computation, numbers)
    print(f"멀티프로세싱: {time.time() - start:.2f}초")

Pool 메서드 비교

import multiprocessing

def process_item(item: tuple[int, str]) -> str:
    idx, text = item
    return f"[{idx}] {text.upper()}"

items = [(i, f"항목_{i}") for i in range(10)]

if __name__ == "__main__":
    with multiprocessing.Pool(4) as pool:
        # map - 단일 인자, 순서 보장
        results = pool.map(process_item, items)

        # starmap - 튜플 언패킹 (여러 인자)
        def add(a: int, b: int) -> int:
            return a + b

        pairs = [(1, 2), (3, 4), (5, 6)]
        sums = pool.starmap(add, pairs)
        print(sums)  # [3, 7, 11]

        # imap - 이터레이터 반환 (메모리 효율적)
        for result in pool.imap(process_item, items):
            print(result)

        # imap_unordered - 순서 무관, 더 빠름
        for result in pool.imap_unordered(process_item, items):
            print(result)

concurrent.futures - 통합 인터페이스

concurrent.futures는 스레딩과 멀티프로세싱을 동일한 인터페이스로 사용할 수 있는 고수준 API입니다.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

# I/O 바운드 → ThreadPoolExecutor
def fetch_data(url: str) -> str:
    time.sleep(1)  # I/O 대기
    return f"데이터: {url}"

urls = [f"https://api.example.com/data/{i}" for i in range(10)]

with ThreadPoolExecutor(max_workers=5) as executor:
    # map - 간편한 병렬 실행
    results = list(executor.map(fetch_data, urls))
    print(f"완료: {len(results)}개")

# CPU 바운드 → ProcessPoolExecutor
import math

def compute(n: int) -> float:
    return sum(math.sqrt(i) for i in range(n))

if __name__ == "__main__":
    numbers = [5_000_000] * 8

    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(compute, numbers))
        print(f"결과: {len(results)}개")

Future 객체와 as_completed

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def process_task(task_id: int) -> dict:
    """가변 시간이 걸리는 작업"""
    duration = random.uniform(0.5, 3.0)
    time.sleep(duration)
    return {"task_id": task_id, "duration": duration}

with ThreadPoolExecutor(max_workers=5) as executor:
    # submit으로 개별 작업 제출
    futures = {
        executor.submit(process_task, i): i
        for i in range(10)
    }

    # 완료되는 순서대로 처리 (순서 무관)
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result(timeout=5)
            print(f"태스크 {result['task_id']}: {result['duration']:.2f}초")
        except Exception as e:
            print(f"태스크 {task_id} 실패: {e}")

동시성 선택 가이드

도구적합한 작업GIL 우회메모리 공유
threadingI/O 바운드 (파일, 네트워크)불필요공유 (Lock 필요)
multiprocessingCPU 바운드 (연산)별도 프로세스비공유 (직렬화)
ThreadPoolExecutorI/O 바운드 (간편)불필요공유
ProcessPoolExecutorCPU 바운드 (간편)별도 프로세스비공유
asyncio대량 I/O (수천 연결)불필요공유

AI/ML에서의 활용

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from pathlib import Path
import json

# 1. 데이터 전처리 병렬화 (CPU 바운드)
def preprocess_image(filepath: str) -> dict:
    """이미지 전처리를 수행합니다."""
    # 실제로는 PIL/OpenCV로 이미지 처리
    return {
        "path": filepath,
        "processed": True,
        "size": Path(filepath).stat().st_size if Path(filepath).exists() else 0,
    }

def parallel_preprocess(filepaths: list[str], workers: int = 4) -> list[dict]:
    """여러 이미지를 병렬로 전처리합니다."""
    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(preprocess_image, filepaths))
    return results

# 2. API 호출 병렬화 (I/O 바운드)
def call_llm_api(prompt: str) -> str:
    """LLM API를 호출합니다."""
    import time
    time.sleep(1)  # API 대기 시뮬레이션
    return f"응답: {prompt[:20]}..."

def batch_llm_calls(prompts: list[str], workers: int = 10) -> list[str]:
    """여러 프롬프트를 병렬로 API 호출합니다."""
    with ThreadPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(call_llm_api, prompts))
    return results

# 3. 대용량 파일 병렬 로딩
def load_json_file(filepath: str) -> dict:
    """JSON 파일을 로딩합니다."""
    with open(filepath, "r", encoding="utf-8") as f:
        return json.load(f)

def parallel_load(filepaths: list[str], workers: int = 8) -> list[dict]:
    """여러 JSON 파일을 병렬로 로딩합니다."""
    with ThreadPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(load_json_file, filepaths))
    return results
multiprocessing은 자식 프로세스를 생성할 때 메인 모듈을 다시 import합니다. if __name__ == "__main__" 가드가 없으면 자식 프로세스가 다시 Pool을 생성하여 무한 재귀가 발생합니다. 특히 Windows와 macOS(spawn 방식)에서 필수입니다.
CPU 바운드 작업은 os.cpu_count() (CPU 코어 수)만큼, I/O 바운드 작업은 그보다 많이 설정할 수 있습니다. 일반적으로 I/O 바운드는 코어 수의 2~5배, CPU 바운드는 코어 수와 동일하게 설정합니다.

체크리스트

  • GIL이 무엇이고 왜 존재하는지 설명할 수 있다
  • I/O 바운드와 CPU 바운드 작업을 구분할 수 있다
  • threading으로 I/O 바운드 작업을 동시 처리할 수 있다
  • multiprocessing으로 CPU 바운드 작업을 병렬 처리할 수 있다
  • concurrent.futuresThreadPoolExecutorProcessPoolExecutor를 사용할 수 있다

다음 문서