스트리밍 (Streaming)
LangGraph는 다양한 스트리밍 모드를 제공하여 실시간으로 Agent의 실행 과정을 전달할 수 있습니다.
스트리밍 모드
| 모드 | 설명 | 사용 시나리오 |
|---|
values | 노드 실행 후 전체 상태 | 상태 추적, 디버깅 |
updates | 노드가 변경한 부분만 | 효율적 상태 업데이트 |
messages | LLM 메시지 토큰 단위 | 챗봇 UI |
custom | 사용자 정의 이벤트 | 진행률, 중간 결과 |
events | 모든 내부 이벤트 | 디버깅, 모니터링 |
values 모드
노드 실행 후 전체 상태를 반환합니다.
from langchain_core.messages import HumanMessage
for chunk in app.stream(
{"messages": [HumanMessage(content="RAG란 무엇인가요?")]},
config=config,
stream_mode="values",
):
# 매 노드 실행 후 전체 messages 상태
messages = chunk["messages"]
print(f"메시지 수: {len(messages)}")
print(messages[-1].content[:100])
updates 모드
노드가 변경한 부분만 반환합니다.
for chunk in app.stream(
{"messages": [HumanMessage(content="질문")]},
config=config,
stream_mode="updates",
):
# {노드이름: {변경된 상태}}
for node_name, update in chunk.items():
print(f"노드: {node_name}")
if "messages" in update:
print(f"새 메시지: {update['messages'][-1].content[:100]}")
messages 모드 (토큰 스트리밍)
LLM의 토큰을 하나씩 스트리밍합니다. 챗봇 UI에서 가장 많이 사용됩니다.
for msg, metadata in app.stream(
{"messages": [HumanMessage(content="LangGraph를 설명해줘")]},
config=config,
stream_mode="messages",
):
# msg: 토큰 단위의 메시지 청크
# metadata: 노드 이름, 실행 정보 등
if msg.content:
print(msg.content, end="", flush=True)
custom 모드 (사용자 정의 이벤트)
중간 진행 상황을 커스텀 이벤트로 전달합니다.
from langgraph.types import StreamWriter
def research_node(state: State, writer: StreamWriter) -> State:
"""검색 진행률을 스트리밍"""
queries = ["쿼리1", "쿼리2", "쿼리3"]
for i, query in enumerate(queries):
results = retriever.invoke(query)
# 커스텀 이벤트 전송
writer({"progress": f"검색 {i+1}/{len(queries)}: {query}"})
return {"messages": [("assistant", "검색 완료")]}
# custom 이벤트 수신
for chunk in app.stream(
{"messages": [HumanMessage(content="조사해줘")]},
config=config,
stream_mode="custom",
):
print(chunk) # {"progress": "검색 1/3: 쿼리1"}
복수 모드 동시 사용
여러 스트리밍 모드를 동시에 수신할 수 있습니다.
for event in app.stream(
{"messages": [HumanMessage(content="질문")]},
config=config,
stream_mode=["messages", "updates"],
):
mode = event[0] # "messages" 또는 "updates"
data = event[1]
if mode == "messages":
msg, metadata = data
print(msg.content, end="")
elif mode == "updates":
print(f"\n[노드 업데이트: {list(data.keys())}]")
프론트엔드 연동 패턴
Server-Sent Events (SSE)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
fastapi_app = FastAPI()
@fastapi_app.post("/chat")
async def chat(question: str):
async def event_stream():
async for msg, metadata in app.astream(
{"messages": [HumanMessage(content=question)]},
config=config,
stream_mode="messages",
):
if msg.content:
yield f"data: {msg.content}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
챗봇 UI: messages 모드로 토큰 스트리밍. 디버깅: values 또는 events 모드. 프로그레스바: custom 모드로 진행률 전송.