Streaming
Streaming in AI models enables real-time data generation. With native SDKs, invocation and response formats vary by provider and model. Langchain simplifies this by offering a unified stream method.
Native SDKs
OpenAI - ChatGPT
from gen_ai_hub.proxy.native.openai import chat
def stream_openai(prompt, model_name='gpt-4o-mini'):
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
kwargs = dict(model_name=model_name, messages=messages, max_tokens=500, stream=True)
stream = chat.completions.create(**kwargs)
for chunk in stream:
if chunk.choices:
content = chunk.choices[0].delta.content
if content:
print(content, end='')
stream_openai("Why is the sky blue?")
Structured model outputs
from gen_ai_hub.proxy import get_proxy_client
from gen_ai_hub.proxy.native.openai import chat, OpenAI
from pydantic import BaseModel
class Person(BaseModel):
name: str
age: int
messages = [{"role": "user", "content": "Tell me about John Doe, aged 30."}]
def stream_openai_structured_outputs(messages, response_object, model_name):
# For more information, see:
# https://www.github.com/openai/openai-python#with_streaming_response and
# https://platform.openai.com/docs/guides/structured-outputs#streaming
with chat.completions.with_streaming_response.parse(
model=model_name,
messages=messages,
response_format=Person
) as stream:
response = stream.parse() # takes care of the stream chunks and returns the final response
return response.choices[0].message.parsed
print(stream_openai_structured_outputs(messages, Person, "gpt-4o-mini"))
def stream_beta_openai_structured_outputs(messages, response_object, model_name):
chat = OpenAI(proxy_client=get_proxy_client())
with chat.beta.chat.completions.stream(
model=model_name,
messages=messages,
response_format=Person
) as stream:
response = stream.get_final_completion() # This will wait for the full response to be received
return response.choices[0].message.parsed
print(stream_beta_openai_structured_outputs(messages, Person, "gpt-4o-mini"))
Google - GenAI
from gen_ai_hub.proxy.core.proxy_clients import get_proxy_client
from gen_ai_hub.proxy.native.google_genai.clients import Client
from google.genai.types import GenerateContentConfig
def stream_genai(prompt, model_name='gemini-2.0-flash'):
proxy_client = get_proxy_client('gen-ai-hub')
client = Client(proxy_client=proxy_client)
response = client.models.generate_content_stream(
model=model_name,
contents=prompt,
config=GenerateContentConfig(max_output_tokens=500),
)
for chunk in response:
print(chunk.text, end='')
stream_genai("Why is the sky blue?")
Anthropic - Claude
import json
from gen_ai_hub.proxy.native.amazon.clients import Session
def stream_claude(prompt, model_name='anthropic--claude-3-haiku'):
bedrock = Session().client(model_name=model_name)
body = json.dumps({
"max_tokens": 500,
"messages": [{"role": "user", "content": prompt}],
"anthropic_version": "bedrock-2023-05-31"
})
response = bedrock.invoke_model_with_response_stream(body=body)
stream = response.get("body")
for event in stream:
chunk = json.loads(event["chunk"]["bytes"])
if chunk["type"] == "content_block_delta":
print(chunk["delta"].get("text", ""), end="")
stream_claude("Why is the sky blue?")
Amazon - Bedrock
import json
from gen_ai_hub.proxy.native.amazon.clients import Session
def stream_bedrock(prompt, model_name='amazon--nova-pro'):
bedrock = Session().client(model_name=model_name)
body = json.dumps({
"schemaVersion": "messages-v1",
"messages": [{"role": "user", "content": [{"text": prompt}]}],
"system": [{"text": "Act as a creative writing assistant."}],
"inferenceConfig": {"maxTokens": 500, "topP": 0.9, "topK": 20, "temperature": 0.7},
})
response = bedrock.invoke_model_with_response_stream(body=body)
stream = response.get("body")
chunk_count = 0
answer = ""
if stream:
for event in stream:
chunk = event.get("chunk")
if chunk:
chunk_json = json.loads(chunk.get("bytes").decode())
content_block_delta = chunk_json.get("contentBlockDelta")
if content_block_delta:
chunk_count += 1
answer += content_block_delta.get("delta").get("text")
print(f"Total chunks: {chunk_count}")
print("Final answer:", answer)
else:
print("No response stream received.")
stream_bedrock("Why is the sky blue?")
Langchain
from gen_ai_hub.proxy.langchain import init_llm
def stream_langchain(prompt, model_name):
llm = init_llm(model_name=model_name, max_tokens=500)
for chunk in llm.stream(prompt):
print(chunk.content, end='')
stream_langchain("How do airplanes stay in the air?", model_name='gpt-4o-mini')
stream_langchain("How do airplanes stay in the air?", model_name='gemini-2.0-flash')
stream_langchain("How do airplanes stay in the air?", model_name='anthropic--claude-3-haiku')
stream_langchain("How do airplanes stay in the air?", model_name='amazon--nova-premier')