Skip to main content

asyncio - 비동기 프로그래밍

학습 목표

  • async/await 문법으로 코루틴을 정의하고 실행할 수 있다
  • 이벤트 루프의 동작 원리를 이해할 수 있다
  • asyncio.gatherasyncio.create_task로 동시 실행을 구현할 수 있다
  • aiohttp로 비동기 HTTP 요청을 수행할 수 있다

왜 중요한가

비동기 프로그래밍(Asynchronous Programming)은 수백~수천 개의 I/O 작업을 단일 스레드에서 효율적으로 처리하는 패턴입니다. LLM API 배치 호출, 대규모 데이터 수집, 실시간 스트리밍 등에서 asyncio는 스레딩보다 가볍고 확장성이 뛰어납니다. FastAPI, LangChain 등 현대 Python 프레임워크의 핵심 기반입니다.

동기 vs 비동기

async/await 기본

1

코루틴 정의와 실행

import asyncio

# async def로 코루틴 함수 정의
async def greet(name: str) -> str:
    print(f"인사 시작: {name}")
    await asyncio.sleep(1)  # 비동기 대기 (1초)
    print(f"인사 완료: {name}")
    return f"안녕하세요, {name}님!"

# 코루틴 실행
async def main():
    result = await greet("파이썬")
    print(result)

# 이벤트 루프 실행
asyncio.run(main())
awaitasync def 함수 내부에서만 사용할 수 있습니다. 일반 함수에서 await를 호출하면 SyntaxError가 발생합니다.
2

동시 실행 - gather

import asyncio
import time

async def fetch_data(name: str, delay: float) -> str:
    """데이터를 가져오는 비동기 함수"""
    print(f"  [{name}] 요청 시작")
    await asyncio.sleep(delay)
    print(f"  [{name}] 요청 완료")
    return f"{name}: 데이터"

async def main():
    # 순차 실행: 약 6초
    start = time.time()
    r1 = await fetch_data("API-1", 2)
    r2 = await fetch_data("API-2", 2)
    r3 = await fetch_data("API-3", 2)
    print(f"순차 실행: {time.time() - start:.1f}초")

    # 동시 실행: 약 2초
    start = time.time()
    results = await asyncio.gather(
        fetch_data("API-1", 2),
        fetch_data("API-2", 2),
        fetch_data("API-3", 2),
    )
    print(f"동시 실행: {time.time() - start:.1f}초")
    print(f"결과: {results}")

asyncio.run(main())
3

태스크 생성과 관리

import asyncio

async def process(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"{name} 완료"

async def main():
    # create_task로 태스크 생성 (즉시 스케줄링)
    task1 = asyncio.create_task(process("작업A", 2))
    task2 = asyncio.create_task(process("작업B", 1))
    task3 = asyncio.create_task(process("작업C", 3))

    # 다른 작업을 하는 동안 태스크가 백그라운드에서 실행됨
    print("태스크 실행 중...")

    # 결과 수집
    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(result1, result2, result3)

asyncio.run(main())

에러 처리와 타임아웃

import asyncio

async def risky_operation(name: str) -> str:
    await asyncio.sleep(1)
    if name == "fail":
        raise ValueError(f"'{name}' 작업 실패!")
    return f"{name} 성공"

async def main():
    # gather에서 에러 처리
    results = await asyncio.gather(
        risky_operation("task1"),
        risky_operation("fail"),
        risky_operation("task3"),
        return_exceptions=True,  # 예외를 결과로 반환
    )

    for r in results:
        if isinstance(r, Exception):
            print(f"에러: {r}")
        else:
            print(f"성공: {r}")

    # 타임아웃 설정
    try:
        result = await asyncio.wait_for(
            risky_operation("slow_task"),
            timeout=0.5,  # 0.5초 제한
        )
    except asyncio.TimeoutError:
        print("작업 시간 초과!")

    # TaskGroup (Python 3.11+) - 구조적 동시성
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(risky_operation("task1"))
            task2 = tg.create_task(risky_operation("task2"))
        # 모든 태스크가 완료되면 여기에 도달
        print(task1.result(), task2.result())
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"그룹 에러: {exc}")

asyncio.run(main())
Python 3.11의 TaskGroupgather보다 안전한 구조적 동시성을 제공합니다. 하나의 태스크가 실패하면 나머지 태스크도 자동으로 취소됩니다.

Semaphore로 동시성 제한

import asyncio

async def call_api(sem: asyncio.Semaphore, task_id: int) -> str:
    """동시 요청 수를 제한하면서 API를 호출합니다."""
    async with sem:  # Semaphore 획득 (동시 제한)
        print(f"  태스크 {task_id}: API 호출 시작")
        await asyncio.sleep(1)  # API 대기
        print(f"  태스크 {task_id}: API 호출 완료")
        return f"결과_{task_id}"

async def main():
    # 동시에 최대 3개만 실행
    sem = asyncio.Semaphore(3)

    tasks = [call_api(sem, i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    print(f"전체 완료: {len(results)}개")

asyncio.run(main())

aiohttp로 비동기 HTTP

pip install aiohttp
import asyncio
import aiohttp

async def fetch_url(
    session: aiohttp.ClientSession,
    url: str,
) -> dict:
    """URL에서 JSON 데이터를 비동기로 가져옵니다."""
    async with session.get(url) as response:
        data = await response.json()
        return {"url": url, "status": response.status, "data": data}

async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3",
    ]

    # 세션을 공유하여 연결 풀 활용
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    for r in results:
        print(f"[{r['status']}] {r['url']}")

asyncio.run(main())

비동기 이터레이터와 제너레이터

import asyncio

# 비동기 제너레이터
async def async_range(start: int, end: int, delay: float = 0.1):
    """비동기로 숫자를 생성합니다."""
    for i in range(start, end):
        await asyncio.sleep(delay)
        yield i

# 비동기 for 문
async def main():
    # async for로 순회
    async for num in async_range(0, 5, 0.2):
        print(f"받음: {num}")

    # 비동기 컴프리헨션
    values = [num async for num in async_range(0, 5, 0.1)]
    print(f"수집: {values}")

asyncio.run(main())

AI/ML에서의 활용

import asyncio
import aiohttp
import json
from typing import Any

# LLM API 배치 호출
async def call_llm(
    session: aiohttp.ClientSession,
    sem: asyncio.Semaphore,
    prompt: str,
    api_url: str = "https://api.openai.com/v1/chat/completions",
    api_key: str = "sk-xxx",
) -> dict[str, Any]:
    """LLM API를 비동기로 호출합니다."""
    async with sem:  # 동시 요청 수 제한
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        }
        payload = {
            "model": "gpt-4o-mini",
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.7,
        }

        try:
            async with session.post(
                api_url, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                result = await response.json()
                return {"prompt": prompt[:50], "response": result, "status": response.status}
        except Exception as e:
            return {"prompt": prompt[:50], "error": str(e), "status": 0}

async def batch_llm_calls(
    prompts: list[str],
    max_concurrent: int = 5,
) -> list[dict]:
    """여러 프롬프트를 비동기로 배치 호출합니다."""
    sem = asyncio.Semaphore(max_concurrent)

    async with aiohttp.ClientSession() as session:
        tasks = [call_llm(session, sem, prompt) for prompt in prompts]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    return [
        r if not isinstance(r, Exception) else {"error": str(r)}
        for r in results
    ]

# 비동기 데이터 파이프라인
async def async_data_pipeline(
    urls: list[str],
    process_fn,
    max_concurrent: int = 10,
) -> list[Any]:
    """비동기 데이터 수집 + 처리 파이프라인"""
    sem = asyncio.Semaphore(max_concurrent)
    results = []

    async def fetch_and_process(session, url):
        async with sem:
            async with session.get(url) as response:
                data = await response.text()
                processed = process_fn(data)
                return processed

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_and_process(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    return [r for r in results if not isinstance(r, Exception)]

# 실행 예시
# asyncio.run(batch_llm_calls(["질문1", "질문2", "질문3"]))
threading은 OS 스레드를 사용하여 동시성을 구현하지만, asyncio는 단일 스레드에서 이벤트 루프를 통해 협력적 멀티태스킹을 수행합니다. asyncio는 스레드보다 가볍고(수천 개 코루틴 가능), 컨텍스트 스위칭 비용이 없으며, Lock 없이 안전합니다. 단, 모든 I/O가 비동기를 지원해야 합니다.
Jupyter는 이미 이벤트 루프가 실행 중이므로 asyncio.run()이 동작하지 않습니다. 대신 await main()을 직접 호출하거나 nest_asyncio 패키지를 사용하세요: import nest_asyncio; nest_asyncio.apply().

체크리스트

  • async defawait로 코루틴을 정의하고 실행할 수 있다
  • asyncio.gather로 여러 코루틴을 동시에 실행할 수 있다
  • Semaphore로 동시 실행 수를 제한할 수 있다
  • aiohttp로 비동기 HTTP 요청을 수행할 수 있다
  • 동기/비동기/멀티스레딩의 차이를 설명할 수 있다

다음 문서