Streaming
Streaming delivers AI responses incrementally as they’re generated, enabling real-time display in chat UIs and reducing perceived latency.
How It Works
sequenceDiagram
participant Client
participant ModelRelay
participant Provider
Client->>ModelRelay: POST /responses (Accept: ndjson)
ModelRelay->>Provider: Forward request
Provider-->>ModelRelay: start event
ModelRelay-->>Client: {"type":"start",...}
loop Token generation
Provider-->>ModelRelay: token
ModelRelay-->>Client: {"type":"update","delta":"..."}
end
Provider-->>ModelRelay: done
ModelRelay-->>Client: {"type":"completion",...}
ModelRelay supports NDJSON (Newline-Delimited JSON) and Server-Sent Events (SSE) for streaming responses. Each NDJSON line (or SSE data: payload) is a complete JSON object representing a stream event:
{"type":"start","request_id":"req_abc","model":"claude-sonnet-4-5"}
{"type":"update","delta":"Hello"}
{"type":"update","delta":" world"}
{"type":"completion","content":"Hello world","usage":{"input_tokens":10,"output_tokens":5},"stop_reason":"end_turn"}
To enable streaming, set the Accept header:
Accept: application/x-ndjson; profile="responses-stream/v2"
For SSE, use:
Accept: text/event-stream
All SDKs handle NDJSON automatically when you use streaming methods. SSE is useful for browser clients; the SSE event name matches the type field (for example, event: update).
Note:
EventSourceonly supports GET requests, so use a POST-capable SSE client (for example,@microsoft/fetch-event-source) for/responses. Workflow run events still support SSE viaAccept: text/event-stream.
Event Types
| Type | Description |
|---|---|
start |
Stream opened, includes model and request ID |
update |
Text delta (partial content) |
completion |
Stream finished, includes usage and stop reason |
error |
Error occurred during generation |
tool_use_start |
Tool call started |
tool_use_delta |
Tool call argument delta |
tool_use_stop |
Tool call completed |
Quick Start
Stream text deltas for display:
import { ModelRelay } from "@modelrelay/sdk";
const mr = ModelRelay.fromSecretKey(process.env.MODELRELAY_API_KEY!);
const stream = await mr.responses.streamTextDeltas(
"claude-sonnet-4-5",
"You are a helpful assistant.",
"Write a haiku about programming."
);
for await (const delta of stream) {
process.stdout.write(delta);
}
stream, err := client.Responses.StreamTextDeltas(
ctx,
sdk.NewModelID("claude-sonnet-4-5"),
"You are a helpful assistant.",
"Write a haiku about programming.",
)
if err != nil {
log.Fatal(err)
}
defer stream.Close()
for {
delta, ok, err := stream.Next()
if err != nil {
log.Fatal(err)
}
if !ok {
break
}
fmt.Print(delta)
}
use modelrelay::{Client, Config, ResponseBuilder, ApiKey};
use futures_util::StreamExt;
let client = Client::new(Config {
api_key: Some(ApiKey::parse(&std::env::var("MODELRELAY_API_KEY")?)?),
..Default::default()
})?;
let mut stream = ResponseBuilder::text_prompt(
"You are a helpful assistant.",
"Write a haiku about programming.",
)
.model("claude-sonnet-4-5")
.stream_deltas(&client.responses())
.await?;
while let Some(delta) = stream.next().await {
print!("{}", delta?);
}
Full Event Stream
For more control, access all stream events:
const req = mr.responses
.new()
.model("claude-sonnet-4-5")
.system("You are a helpful assistant.")
.user("Hello!")
.build();
const stream = await mr.responses.stream(req);
for await (const event of stream) {
switch (event.type) {
case "message_start":
console.log("Stream started:", event.responseId);
console.log("Model:", event.model);
break;
case "message_delta":
if (event.textDelta) {
process.stdout.write(event.textDelta);
}
break;
case "message_stop":
console.log("\n---");
console.log("Stop reason:", event.stopReason);
console.log("Usage:", event.usage);
break;
}
}
req, opts, _ := client.Responses.New().
Model(sdk.NewModelID("claude-sonnet-4-5")).
System("You are a helpful assistant.").
User("Hello!").
Build()
stream, err := client.Responses.Stream(ctx, req, opts...)
if err != nil {
log.Fatal(err)
}
defer stream.Close()
for {
event, ok, err := stream.Next()
if err != nil {
log.Fatal(err)
}
if !ok {
break
}
switch event.Kind {
case llm.StreamEventKindMessageStart:
fmt.Println("Stream started:", event.ResponseID)
case llm.StreamEventKindMessageDelta:
fmt.Print(event.TextDelta)
case llm.StreamEventKindMessageStop:
fmt.Println("\n---")
fmt.Println("Stop reason:", event.StopReason)
fmt.Println("Usage:", event.Usage)
}
}
use modelrelay::{ResponseBuilder, StreamEvent};
use futures_util::StreamExt;
let mut stream = ResponseBuilder::new()
.model("claude-sonnet-4-5")
.system("You are a helpful assistant.")
.user("Hello!")
.stream(&client.responses())
.await?;
while let Some(event) = stream.next().await {
let event = event?;
match event {
StreamEvent::MessageStart { response_id, model, .. } => {
println!("Stream started: {}", response_id);
println!("Model: {}", model);
}
StreamEvent::MessageDelta { text_delta, .. } => {
if let Some(delta) = text_delta {
print!("{}", delta);
}
}
StreamEvent::MessageStop { stop_reason, usage, .. } => {
println!("\n---");
println!("Stop reason: {:?}", stop_reason);
println!("Usage: {:?}", usage);
}
_ => {}
}
}
Collecting Stream Results
If you need both streaming display and the final response object:
const stream = await mr.responses.stream(req);
// Option 1: Collect after streaming
let text = "";
for await (const event of stream) {
if (event.type === "message_delta" && event.textDelta) {
text += event.textDelta;
process.stdout.write(event.textDelta);
}
}
// Option 2: Use collect() to drain and build response
const response = await stream.collect();
console.log(response.output);
console.log(response.usage);
stream, _ := client.Responses.Stream(ctx, req)
// Collect drains the stream and returns a Response
response, err := stream.Collect(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Response ID:", response.ID)
fmt.Println("Usage:", response.Usage)
let stream = ResponseBuilder::new()
.model("claude-sonnet-4-5")
.user("Hello!")
.stream(&client.responses())
.await?;
// Collect drains the stream and returns a Response
let response = stream.collect().await?;
println!("Response ID: {}", response.id);
println!("Usage: {:?}", response.usage);
Stream Timeouts
Configure timeouts to handle slow or stalled streams:
const stream = await mr.responses.stream(req, {
streamTimeouts: {
ttftMs: 30_000, // Time to first token: 30s
idleMs: 10_000, // Max time between tokens: 10s
totalMs: 120_000, // Total stream duration: 2min
},
});
stream, err := client.Responses.Stream(ctx, req,
sdk.WithStreamTTFTTimeout(30*time.Second),
sdk.WithStreamIdleTimeout(10*time.Second),
sdk.WithStreamTotalTimeout(2*time.Minute),
)
use std::time::Duration;
use modelrelay::StreamTimeouts;
let stream = ResponseBuilder::new()
.model("claude-sonnet-4-5")
.user("Hello!")
.stream_with_timeouts(
&client.responses(),
StreamTimeouts {
ttft: Some(Duration::from_secs(30)), // Time to first token
idle: Some(Duration::from_secs(10)), // Max between tokens
total: Some(Duration::from_secs(120)), // Total duration
},
)
.await?;
Timeout Types
| Timeout | Description | Use Case |
|---|---|---|
| TTFT | Time to first token | Detect if model is responding |
| Idle | Max gap between tokens | Detect stalled streams |
| Total | Overall stream duration | Prevent runaway requests |
Error Handling
import {
ModelRelay,
APIError,
TransportError,
StreamTimeoutError,
} from "@modelrelay/sdk";
try {
const stream = await mr.responses.stream(req);
for await (const event of stream) {
if (event.type === "message_delta" && event.textDelta) {
process.stdout.write(event.textDelta);
}
}
} catch (error) {
if (error instanceof StreamTimeoutError) {
console.error(`Stream timeout (${error.kind}): ${error.timeoutMs}ms`);
} else if (error instanceof APIError) {
console.error(`API error: ${error.message}`);
} else if (error instanceof TransportError) {
console.error(`Connection error: ${error.message}`);
} else {
throw error;
}
}
import "errors"
stream, err := client.Responses.Stream(ctx, req)
if err != nil {
log.Fatal(err)
}
defer stream.Close()
for {
event, ok, err := stream.Next()
if err != nil {
var timeoutErr sdk.StreamTimeoutError
var apiErr sdk.APIError
var transportErr sdk.TransportError
if errors.As(err, &timeoutErr) {
log.Printf("Stream timeout (%s): %s", timeoutErr.Kind, timeoutErr.Timeout)
} else if errors.As(err, &apiErr) {
log.Printf("API error: %s", apiErr.Message)
} else if errors.As(err, &transportErr) {
log.Printf("Connection error: %s", transportErr.Message)
} else {
log.Fatal(err)
}
break
}
if !ok {
break
}
fmt.Print(event.TextDelta)
}
use modelrelay::errors::{Error, StreamTimeoutError};
use futures_util::StreamExt;
let mut stream = ResponseBuilder::new()
.model("claude-sonnet-4-5")
.user("Hello!")
.stream(&client.responses())
.await?;
while let Some(result) = stream.next().await {
match result {
Ok(event) => {
if let StreamEvent::MessageDelta { text_delta: Some(delta), .. } = event {
print!("{}", delta);
}
}
Err(Error::StreamTimeout(StreamTimeoutError { kind, timeout })) => {
eprintln!("Stream timeout ({:?}): {:?}", kind, timeout);
break;
}
Err(Error::API(e)) => {
eprintln!("API error: {}", e.message);
break;
}
Err(Error::Transport(e)) => {
eprintln!("Connection error: {}", e.message);
break;
}
Err(e) => return Err(e.into()),
}
}
Cancelling Streams
const stream = await mr.responses.stream(req);
// Cancel from outside the loop
setTimeout(() => {
stream.cancel("user cancelled");
}, 5000);
try {
for await (const event of stream) {
process.stdout.write(event.textDelta ?? "");
}
} catch (error) {
console.log("Stream cancelled");
}
Use context cancellation:
ctx, cancel := context.WithCancel(context.Background())
// Cancel from another goroutine
go func() {
time.Sleep(5 * time.Second)
cancel()
}()
stream, _ := client.Responses.Stream(ctx, req)
defer stream.Close()
for {
event, ok, err := stream.Next()
if err != nil {
if errors.Is(err, context.Canceled) {
fmt.Println("Stream cancelled")
}
break
}
if !ok {
break
}
fmt.Print(event.TextDelta)
}
Use tokio::select! for cancellation:
use tokio::time::{sleep, Duration};
use tokio_util::sync::CancellationToken;
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
// Cancel from another task
tokio::spawn(async move {
sleep(Duration::from_secs(5)).await;
cancel_clone.cancel();
});
let mut stream = ResponseBuilder::new()
.model("claude-sonnet-4-5")
.user("Hello!")
.stream(&client.responses())
.await?;
loop {
tokio::select! {
_ = cancel.cancelled() => {
println!("Stream cancelled");
break;
}
event = stream.next() => {
match event {
Some(Ok(e)) => print!("{}", e.text_delta().unwrap_or_default()),
Some(Err(e)) => return Err(e.into()),
None => break,
}
}
}
}
Display Patterns
Cursor Animation
Show a typing indicator while waiting:
let content = "";
const cursor = "▋";
for await (const event of stream) {
if (event.type === "message_delta" && event.textDelta) {
content += event.textDelta;
// Clear line and redraw with cursor
process.stdout.write(`\r${content}${cursor}`);
}
}
// Remove cursor when done
process.stdout.write(`\r${content} \n`);
Buffered Updates
Batch updates for smoother rendering:
let buffer = "";
let lastRender = Date.now();
const RENDER_INTERVAL = 50; // ms
for await (const event of stream) {
if (event.type === "message_delta" && event.textDelta) {
buffer += event.textDelta;
const now = Date.now();
if (now - lastRender >= RENDER_INTERVAL) {
process.stdout.write(buffer);
buffer = "";
lastRender = now;
}
}
}
// Flush remaining buffer
if (buffer) {
process.stdout.write(buffer);
}
Streaming with Tool Calls
Handle tool calls in streaming mode:
import { ToolCallAccumulator } from "@modelrelay/sdk";
const stream = await mr.responses.stream(req);
const toolAccumulator = new ToolCallAccumulator();
for await (const event of stream) {
switch (event.type) {
case "message_delta":
if (event.textDelta) {
process.stdout.write(event.textDelta);
}
break;
case "tool_use_start":
case "tool_use_delta":
if (event.toolCallDelta) {
toolAccumulator.processDelta(event.toolCallDelta);
}
break;
case "tool_use_stop":
const toolCalls = toolAccumulator.getToolCalls();
for (const call of toolCalls) {
console.log(`Tool: ${call.function?.name}`);
console.log(`Args: ${call.function?.arguments}`);
}
break;
}
}
Best Practices
-
Always close streams - Use
defer stream.Close()(Go) or ensure the async iterator completes (TypeScript) -
Set appropriate timeouts - Prevent hung connections with TTFT and idle timeouts
-
Handle partial content - The final
completionevent includes the full content; deltas are incremental -
Check for errors in events - Some providers may send error events mid-stream
-
Use context cancellation - Pass cancellable contexts to allow clean shutdown
Next Steps
- First Request — Basic request examples
- Tool Use — Streaming with function calls
- Structured Output — Streaming typed JSON