Why Streaming Matters

LLM responses can take seconds to generate. Without streaming, users stare at a blank screen. With streaming, they see words appear in real-time - a much better experience.

  • Perceived performance: Users feel the app is faster
  • Engagement: Reading as it generates keeps attention
  • Early cancellation: Users can stop if response is wrong

OpenAI Streaming

from openai import OpenAI

client = OpenAI()

# Enable streaming
stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Write a story"}],
    stream=True
)

# Process chunks
for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

# Async version
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def stream_response():
    stream = await async_client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Write a story"}],
        stream=True
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

FastAPI Streaming Endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json

app = FastAPI()
client = OpenAI()

@app.post("/chat/stream")
async def stream_chat(request: dict):
    async def generate():
        stream = client.chat.completions.create(
            model="gpt-4",
            messages=request["messages"],
            stream=True
        )

        for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                # Send as Server-Sent Event
                yield f"data: {json.dumps({'content': content})}\n\n"

        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Frontend Consumption

// JavaScript EventSource for SSE
const eventSource = new EventSource('/chat/stream');

eventSource.onmessage = (event) => {
    if (event.data === '[DONE]') {
        eventSource.close();
        return;
    }

    const data = JSON.parse(event.data);
    document.getElementById('response').textContent += data.content;
};

// Or using fetch with ReadableStream
async function streamChat(messages) {
    const response = await fetch('/chat/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ messages })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const text = decoder.decode(value);
        // Parse SSE format
        const lines = text.split('\n');
        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = line.slice(6);
                if (data !== '[DONE]') {
                    const { content } = JSON.parse(data);
                    updateUI(content);
                }
            }
        }
    }
}

LangChain Streaming

from langchain_openai import ChatOpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# Method 1: Callbacks
llm = ChatOpenAI(
    model="gpt-4",
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)
llm.invoke("Tell me a joke")

# Method 2: Stream method
llm = ChatOpenAI(model="gpt-4")

for chunk in llm.stream("Tell me a story"):
    print(chunk.content, end="", flush=True)

# Method 3: Async streaming
async for chunk in llm.astream("Tell me a story"):
    print(chunk.content, end="", flush=True)

# Streaming with chains
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("Write about {topic}")
chain = prompt | llm | StrOutputParser()

for chunk in chain.stream({"topic": "AI"}):
    print(chunk, end="", flush=True)

Streaming with Function Calling

from openai import OpenAI
import json

client = OpenAI()

tools = [{"type": "function", "function": {...}}]

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[...],
    tools=tools,
    stream=True
)

# Accumulate function call arguments
function_args = ""
function_name = ""

for chunk in stream:
    delta = chunk.choices[0].delta

    # Check for function call
    if delta.tool_calls:
        tool_call = delta.tool_calls[0]
        if tool_call.function.name:
            function_name = tool_call.function.name
        if tool_call.function.arguments:
            function_args += tool_call.function.arguments

    # Check for content
    if delta.content:
        print(delta.content, end="", flush=True)

# After stream ends, execute function if called
if function_name:
    args = json.loads(function_args)
    result = execute_function(function_name, args)

WebSocket Streaming

from fastapi import FastAPI, WebSocket
from openai import OpenAI

app = FastAPI()
client = OpenAI()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()

    while True:
        # Receive message
        data = await websocket.receive_json()

        # Stream response
        stream = client.chat.completions.create(
            model="gpt-4",
            messages=data["messages"],
            stream=True
        )

        for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                await websocket.send_json({
                    "type": "chunk",
                    "content": content
                })

        await websocket.send_json({"type": "done"})

Best Practices

  • Buffer appropriately: Send word/sentence chunks, not character by character
  • Handle errors: Stream errors gracefully to the client
  • Timeout handling: Set appropriate timeouts for long responses
  • Memory management: Don't accumulate entire response in memory
  • Cancellation: Allow users to cancel ongoing streams

Build Real-Time AI Applications

Our Agentic AI program covers streaming and production deployment. Learn to build responsive AI experiences.

Explore Agentic AI Program

Related Articles