import threading# 공유 자원 접근 시 Lock 필요counter = 0lock = threading.Lock()def increment(n: int) -> None: global counter for _ in range(n): with lock: # Lock 획득 → 해제 자동 관리 counter += 1threads = [ threading.Thread(target=increment, args=(100000,)) for _ in range(4)]for t in threads: t.start()for t in threads: t.join()print(f"결과: {counter}") # 400000 (Lock 없으면 불정확)
Lock 없이 여러 스레드가 같은 변수를 수정하면 경쟁 조건(Race Condition)이 발생합니다. 공유 자원에 접근할 때는 반드시 Lock을 사용하세요.
from concurrent.futures import ThreadPoolExecutor, as_completedimport timeimport randomdef process_task(task_id: int) -> dict: """가변 시간이 걸리는 작업""" duration = random.uniform(0.5, 3.0) time.sleep(duration) return {"task_id": task_id, "duration": duration}with ThreadPoolExecutor(max_workers=5) as executor: # submit으로 개별 작업 제출 futures = { executor.submit(process_task, i): i for i in range(10) } # 완료되는 순서대로 처리 (순서 무관) for future in as_completed(futures): task_id = futures[future] try: result = future.result(timeout=5) print(f"태스크 {result['task_id']}: {result['duration']:.2f}초") except Exception as e: print(f"태스크 {task_id} 실패: {e}")
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutorfrom pathlib import Pathimport json# 1. 데이터 전처리 병렬화 (CPU 바운드)def preprocess_image(filepath: str) -> dict: """이미지 전처리를 수행합니다.""" # 실제로는 PIL/OpenCV로 이미지 처리 return { "path": filepath, "processed": True, "size": Path(filepath).stat().st_size if Path(filepath).exists() else 0, }def parallel_preprocess(filepaths: list[str], workers: int = 4) -> list[dict]: """여러 이미지를 병렬로 전처리합니다.""" with ProcessPoolExecutor(max_workers=workers) as executor: results = list(executor.map(preprocess_image, filepaths)) return results# 2. API 호출 병렬화 (I/O 바운드)def call_llm_api(prompt: str) -> str: """LLM API를 호출합니다.""" import time time.sleep(1) # API 대기 시뮬레이션 return f"응답: {prompt[:20]}..."def batch_llm_calls(prompts: list[str], workers: int = 10) -> list[str]: """여러 프롬프트를 병렬로 API 호출합니다.""" with ThreadPoolExecutor(max_workers=workers) as executor: results = list(executor.map(call_llm_api, prompts)) return results# 3. 대용량 파일 병렬 로딩def load_json_file(filepath: str) -> dict: """JSON 파일을 로딩합니다.""" with open(filepath, "r", encoding="utf-8") as f: return json.load(f)def parallel_load(filepaths: list[str], workers: int = 8) -> list[dict]: """여러 JSON 파일을 병렬로 로딩합니다.""" with ThreadPoolExecutor(max_workers=workers) as executor: results = list(executor.map(load_json_file, filepaths)) return results
multiprocessing에서 if __name__ == '__main__'이 필요한 이유는?
multiprocessing은 자식 프로세스를 생성할 때 메인 모듈을 다시 import합니다. if __name__ == "__main__" 가드가 없으면 자식 프로세스가 다시 Pool을 생성하여 무한 재귀가 발생합니다. 특히 Windows와 macOS(spawn 방식)에서 필수입니다.
워커 수는 몇 개가 적당한가요?
CPU 바운드 작업은 os.cpu_count() (CPU 코어 수)만큼, I/O 바운드 작업은 그보다 많이 설정할 수 있습니다. 일반적으로 I/O 바운드는 코어 수의 2~5배, CPU 바운드는 코어 수와 동일하게 설정합니다.