Skip to main content

스트리밍 (Streaming)

LangGraph는 다양한 스트리밍 모드를 제공하여 실시간으로 Agent의 실행 과정을 전달할 수 있습니다.

스트리밍 모드

모드설명사용 시나리오
values노드 실행 후 전체 상태상태 추적, 디버깅
updates노드가 변경한 부분만효율적 상태 업데이트
messagesLLM 메시지 토큰 단위챗봇 UI
custom사용자 정의 이벤트진행률, 중간 결과
events모든 내부 이벤트디버깅, 모니터링

values 모드

노드 실행 후 전체 상태를 반환합니다.
from langchain_core.messages import HumanMessage

for chunk in app.stream(
    {"messages": [HumanMessage(content="RAG란 무엇인가요?")]},
    config=config,
    stream_mode="values",
):
    # 매 노드 실행 후 전체 messages 상태
    messages = chunk["messages"]
    print(f"메시지 수: {len(messages)}")
    print(messages[-1].content[:100])

updates 모드

노드가 변경한 부분만 반환합니다.
for chunk in app.stream(
    {"messages": [HumanMessage(content="질문")]},
    config=config,
    stream_mode="updates",
):
    # {노드이름: {변경된 상태}}
    for node_name, update in chunk.items():
        print(f"노드: {node_name}")
        if "messages" in update:
            print(f"새 메시지: {update['messages'][-1].content[:100]}")

messages 모드 (토큰 스트리밍)

LLM의 토큰을 하나씩 스트리밍합니다. 챗봇 UI에서 가장 많이 사용됩니다.
for msg, metadata in app.stream(
    {"messages": [HumanMessage(content="LangGraph를 설명해줘")]},
    config=config,
    stream_mode="messages",
):
    # msg: 토큰 단위의 메시지 청크
    # metadata: 노드 이름, 실행 정보 등
    if msg.content:
        print(msg.content, end="", flush=True)

custom 모드 (사용자 정의 이벤트)

중간 진행 상황을 커스텀 이벤트로 전달합니다.
from langgraph.types import StreamWriter

def research_node(state: State, writer: StreamWriter) -> State:
    """검색 진행률을 스트리밍"""
    queries = ["쿼리1", "쿼리2", "쿼리3"]

    for i, query in enumerate(queries):
        results = retriever.invoke(query)
        # 커스텀 이벤트 전송
        writer({"progress": f"검색 {i+1}/{len(queries)}: {query}"})

    return {"messages": [("assistant", "검색 완료")]}

# custom 이벤트 수신
for chunk in app.stream(
    {"messages": [HumanMessage(content="조사해줘")]},
    config=config,
    stream_mode="custom",
):
    print(chunk)  # {"progress": "검색 1/3: 쿼리1"}

복수 모드 동시 사용

여러 스트리밍 모드를 동시에 수신할 수 있습니다.
for event in app.stream(
    {"messages": [HumanMessage(content="질문")]},
    config=config,
    stream_mode=["messages", "updates"],
):
    mode = event[0]  # "messages" 또는 "updates"
    data = event[1]

    if mode == "messages":
        msg, metadata = data
        print(msg.content, end="")
    elif mode == "updates":
        print(f"\n[노드 업데이트: {list(data.keys())}]")

프론트엔드 연동 패턴

Server-Sent Events (SSE)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

fastapi_app = FastAPI()

@fastapi_app.post("/chat")
async def chat(question: str):
    async def event_stream():
        async for msg, metadata in app.astream(
            {"messages": [HumanMessage(content=question)]},
            config=config,
            stream_mode="messages",
        ):
            if msg.content:
                yield f"data: {msg.content}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")
챗봇 UI: messages 모드로 토큰 스트리밍. 디버깅: values 또는 events 모드. 프로그레스바: custom 모드로 진행률 전송.