asyncio - 비동기 프로그래밍
학습 목표
async/await문법으로 코루틴을 정의하고 실행할 수 있다- 이벤트 루프의 동작 원리를 이해할 수 있다
asyncio.gather와asyncio.create_task로 동시 실행을 구현할 수 있다aiohttp로 비동기 HTTP 요청을 수행할 수 있다
왜 중요한가
비동기 프로그래밍(Asynchronous Programming)은 수백~수천 개의 I/O 작업을 단일 스레드에서 효율적으로 처리하는 패턴입니다. LLM API 배치 호출, 대규모 데이터 수집, 실시간 스트리밍 등에서asyncio는 스레딩보다 가볍고 확장성이 뛰어납니다. FastAPI, LangChain 등 현대 Python 프레임워크의 핵심 기반입니다.
동기 vs 비동기
async/await 기본
코루틴 정의와 실행
Copy
import asyncio
# async def로 코루틴 함수 정의
async def greet(name: str) -> str:
print(f"인사 시작: {name}")
await asyncio.sleep(1) # 비동기 대기 (1초)
print(f"인사 완료: {name}")
return f"안녕하세요, {name}님!"
# 코루틴 실행
async def main():
result = await greet("파이썬")
print(result)
# 이벤트 루프 실행
asyncio.run(main())
await는 async def 함수 내부에서만 사용할 수 있습니다. 일반 함수에서 await를 호출하면 SyntaxError가 발생합니다.동시 실행 - gather
Copy
import asyncio
import time
async def fetch_data(name: str, delay: float) -> str:
"""데이터를 가져오는 비동기 함수"""
print(f" [{name}] 요청 시작")
await asyncio.sleep(delay)
print(f" [{name}] 요청 완료")
return f"{name}: 데이터"
async def main():
# 순차 실행: 약 6초
start = time.time()
r1 = await fetch_data("API-1", 2)
r2 = await fetch_data("API-2", 2)
r3 = await fetch_data("API-3", 2)
print(f"순차 실행: {time.time() - start:.1f}초")
# 동시 실행: 약 2초
start = time.time()
results = await asyncio.gather(
fetch_data("API-1", 2),
fetch_data("API-2", 2),
fetch_data("API-3", 2),
)
print(f"동시 실행: {time.time() - start:.1f}초")
print(f"결과: {results}")
asyncio.run(main())
태스크 생성과 관리
Copy
import asyncio
async def process(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} 완료"
async def main():
# create_task로 태스크 생성 (즉시 스케줄링)
task1 = asyncio.create_task(process("작업A", 2))
task2 = asyncio.create_task(process("작업B", 1))
task3 = asyncio.create_task(process("작업C", 3))
# 다른 작업을 하는 동안 태스크가 백그라운드에서 실행됨
print("태스크 실행 중...")
# 결과 수집
result1 = await task1
result2 = await task2
result3 = await task3
print(result1, result2, result3)
asyncio.run(main())
에러 처리와 타임아웃
Copy
import asyncio
async def risky_operation(name: str) -> str:
await asyncio.sleep(1)
if name == "fail":
raise ValueError(f"'{name}' 작업 실패!")
return f"{name} 성공"
async def main():
# gather에서 에러 처리
results = await asyncio.gather(
risky_operation("task1"),
risky_operation("fail"),
risky_operation("task3"),
return_exceptions=True, # 예외를 결과로 반환
)
for r in results:
if isinstance(r, Exception):
print(f"에러: {r}")
else:
print(f"성공: {r}")
# 타임아웃 설정
try:
result = await asyncio.wait_for(
risky_operation("slow_task"),
timeout=0.5, # 0.5초 제한
)
except asyncio.TimeoutError:
print("작업 시간 초과!")
# TaskGroup (Python 3.11+) - 구조적 동시성
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(risky_operation("task1"))
task2 = tg.create_task(risky_operation("task2"))
# 모든 태스크가 완료되면 여기에 도달
print(task1.result(), task2.result())
except* ValueError as eg:
for exc in eg.exceptions:
print(f"그룹 에러: {exc}")
asyncio.run(main())
Python 3.11의
TaskGroup은 gather보다 안전한 구조적 동시성을 제공합니다. 하나의 태스크가 실패하면 나머지 태스크도 자동으로 취소됩니다.Semaphore로 동시성 제한
Copy
import asyncio
async def call_api(sem: asyncio.Semaphore, task_id: int) -> str:
"""동시 요청 수를 제한하면서 API를 호출합니다."""
async with sem: # Semaphore 획득 (동시 제한)
print(f" 태스크 {task_id}: API 호출 시작")
await asyncio.sleep(1) # API 대기
print(f" 태스크 {task_id}: API 호출 완료")
return f"결과_{task_id}"
async def main():
# 동시에 최대 3개만 실행
sem = asyncio.Semaphore(3)
tasks = [call_api(sem, i) for i in range(10)]
results = await asyncio.gather(*tasks)
print(f"전체 완료: {len(results)}개")
asyncio.run(main())
aiohttp로 비동기 HTTP
Copy
pip install aiohttp
Copy
import asyncio
import aiohttp
async def fetch_url(
session: aiohttp.ClientSession,
url: str,
) -> dict:
"""URL에서 JSON 데이터를 비동기로 가져옵니다."""
async with session.get(url) as response:
data = await response.json()
return {"url": url, "status": response.status, "data": data}
async def main():
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
]
# 세션을 공유하여 연결 풀 활용
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for r in results:
print(f"[{r['status']}] {r['url']}")
asyncio.run(main())
비동기 이터레이터와 제너레이터
Copy
import asyncio
# 비동기 제너레이터
async def async_range(start: int, end: int, delay: float = 0.1):
"""비동기로 숫자를 생성합니다."""
for i in range(start, end):
await asyncio.sleep(delay)
yield i
# 비동기 for 문
async def main():
# async for로 순회
async for num in async_range(0, 5, 0.2):
print(f"받음: {num}")
# 비동기 컴프리헨션
values = [num async for num in async_range(0, 5, 0.1)]
print(f"수집: {values}")
asyncio.run(main())
AI/ML에서의 활용
Copy
import asyncio
import aiohttp
import json
from typing import Any
# LLM API 배치 호출
async def call_llm(
session: aiohttp.ClientSession,
sem: asyncio.Semaphore,
prompt: str,
api_url: str = "https://api.openai.com/v1/chat/completions",
api_key: str = "sk-xxx",
) -> dict[str, Any]:
"""LLM API를 비동기로 호출합니다."""
async with sem: # 동시 요청 수 제한
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.7,
}
try:
async with session.post(
api_url, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=30)
) as response:
result = await response.json()
return {"prompt": prompt[:50], "response": result, "status": response.status}
except Exception as e:
return {"prompt": prompt[:50], "error": str(e), "status": 0}
async def batch_llm_calls(
prompts: list[str],
max_concurrent: int = 5,
) -> list[dict]:
"""여러 프롬프트를 비동기로 배치 호출합니다."""
sem = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [call_llm(session, sem, prompt) for prompt in prompts]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
r if not isinstance(r, Exception) else {"error": str(r)}
for r in results
]
# 비동기 데이터 파이프라인
async def async_data_pipeline(
urls: list[str],
process_fn,
max_concurrent: int = 10,
) -> list[Any]:
"""비동기 데이터 수집 + 처리 파이프라인"""
sem = asyncio.Semaphore(max_concurrent)
results = []
async def fetch_and_process(session, url):
async with sem:
async with session.get(url) as response:
data = await response.text()
processed = process_fn(data)
return processed
async with aiohttp.ClientSession() as session:
tasks = [fetch_and_process(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
# 실행 예시
# asyncio.run(batch_llm_calls(["질문1", "질문2", "질문3"]))
asyncio와 threading은 어떻게 다른가요?
asyncio와 threading은 어떻게 다른가요?
threading은 OS 스레드를 사용하여 동시성을 구현하지만, asyncio는 단일 스레드에서 이벤트 루프를 통해 협력적 멀티태스킹을 수행합니다. asyncio는 스레드보다 가볍고(수천 개 코루틴 가능), 컨텍스트 스위칭 비용이 없으며, Lock 없이 안전합니다. 단, 모든 I/O가 비동기를 지원해야 합니다.Jupyter에서 asyncio를 사용하려면?
Jupyter에서 asyncio를 사용하려면?
Jupyter는 이미 이벤트 루프가 실행 중이므로
asyncio.run()이 동작하지 않습니다. 대신 await main()을 직접 호출하거나 nest_asyncio 패키지를 사용하세요: import nest_asyncio; nest_asyncio.apply().체크리스트
-
async def와await로 코루틴을 정의하고 실행할 수 있다 -
asyncio.gather로 여러 코루틴을 동시에 실행할 수 있다 -
Semaphore로 동시 실행 수를 제한할 수 있다 -
aiohttp로 비동기 HTTP 요청을 수행할 수 있다 - 동기/비동기/멀티스레딩의 차이를 설명할 수 있다

