Workflows
Workflows let you define multi-step AI pipelines as declarative specs. Instead of writing imperative code to orchestrate multiple LLM calls, you define a directed acyclic graph (DAG) of nodes and let ModelRelay handle execution, parallelism, and state management.
Why Workflows?
flowchart LR
A[Input] --> B[Node 1]
A --> C[Node 2]
B --> D[Node 3]
C --> D
D --> E[Output]
Without workflows: You write imperative code to call multiple models, manage intermediate state, handle errors, and coordinate parallel execution.
With workflows: You declare what you want (nodes, connections, outputs) and the runtime handles how to execute it efficiently.
Benefits:
- Parallel execution: Independent nodes run concurrently up to
max_parallelism - Automatic retries: Failed nodes can be retried with exponential backoff
- Event streaming: Monitor progress in real-time via event stream
- Idempotent runs: Replay requests return cached results
- Cost tracking: Aggregated usage across all nodes
Quick Start
Define a workflow spec and run it:
import { ModelRelay, WorkflowKinds, WorkflowNodeTypes } from "@modelrelay/sdk";
const mr = ModelRelay.fromSecretKey(process.env.MODELRELAY_API_KEY!);
// Define a simple workflow with one LLM node
const spec = {
kind: WorkflowKinds.WorkflowV0,
name: "greeting-workflow",
nodes: [
{
id: "greeter",
type: WorkflowNodeTypes.LLMResponses,
input: {
request: {
model: "claude-sonnet-4-20250514",
system: "You are a friendly assistant.",
input: [{ role: "user", content: "Say hello!" }],
},
},
},
],
outputs: [
{ name: "greeting", from: "greeter" },
],
};
// Start the run
const { run_id } = await mr.runs.create(spec);
// Stream events until completion
for await (const event of await mr.runs.events(run_id)) {
if (event.type === "node_output_delta" && event.delta.text_delta) {
process.stdout.write(event.delta.text_delta);
}
if (event.type === "run_completed") {
console.log("\nRun completed!");
}
}
ctx := context.Background()
// Define a simple workflow with one LLM node
spec := sdk.WorkflowV0().
Name("greeting-workflow").
Node(sdk.LLMResponsesNode("greeter", sdk.LLMResponsesInput{
Request: sdk.ResponsesRequest{
Model: sdk.NewModelID("claude-sonnet-4-20250514"),
System: "You are a friendly assistant.",
Input: []sdk.InputItem{{Role: "user", Content: "Say hello!"}},
},
})).
Output("greeting", "greeter").
Build()
// Start the run
result, err := client.Runs.Create(ctx, spec)
if err != nil {
log.Fatal(err)
}
// Stream events until completion
stream, err := client.Runs.StreamEvents(ctx, result.RunID)
if err != nil {
log.Fatal(err)
}
defer stream.Close()
for {
event, ok, err := stream.Next()
if err != nil {
log.Fatal(err)
}
if !ok {
break
}
switch ev := event.(type) {
case *sdk.NodeOutputDeltaEvent:
if ev.Delta.TextDelta != "" {
fmt.Print(ev.Delta.TextDelta)
}
case *sdk.RunCompletedEvent:
fmt.Println("\nRun completed!")
}
}
use futures_util::StreamExt;
use modelrelay::{Client, WorkflowBuilder, RunEvent};
let client = Client::from_api_key(std::env::var("MODELRELAY_API_KEY")?)?
.build()?;
// Define a simple workflow with one LLM node
let spec = WorkflowBuilder::new()
.name("greeting-workflow")
.llm_responses_node("greeter", |node| {
node.model("claude-sonnet-4-20250514")
.system("You are a friendly assistant.")
.user("Say hello!")
})
.output("greeting", "greeter")
.build();
// Start the run
let result = client.runs().create(&spec).await?;
// Stream events until completion
let mut stream = client.runs().stream_events(&result.run_id).await?;
while let Some(event) = stream.next().await {
let event = event?;
match event {
RunEvent::NodeOutputDelta { delta, .. } => {
if let Some(text) = delta.text_delta {
print!("{}", text);
}
}
RunEvent::RunCompleted { .. } => {
println!("\nRun completed!");
}
_ => {}
}
}
Workflow Spec Format
A workflow spec is a JSON document with this structure:
{
"kind": "workflow.v0",
"name": "my-workflow",
"execution": {
"max_parallelism": 4,
"node_timeout_ms": 60000,
"run_timeout_ms": 300000
},
"nodes": [...],
"edges": [...],
"outputs": [...]
}
| Field | Type | Description |
|---|---|---|
kind |
string | Must be "workflow.v0" |
name |
string | Optional workflow name for identification |
execution |
object | Optional execution configuration |
nodes |
array | List of nodes (required) |
edges |
array | Explicit edges between nodes (optional) |
outputs |
array | Values to extract from completed nodes (required) |
Execution Config
| Field | Default | Description |
|---|---|---|
max_parallelism |
8 | Maximum concurrent node executions |
node_timeout_ms |
60000 | Timeout for individual node execution |
run_timeout_ms |
300000 | Timeout for entire workflow run |
Node Types
llm.responses
Makes an LLM API call. This is the primary node type for AI inference.
{
"id": "analyzer",
"type": "llm.responses",
"input": {
"request": {
"model": "claude-sonnet-4-20250514",
"system": "You are a helpful assistant.",
"input": [
{ "role": "user", "content": "Analyze this data..." }
],
"max_output_tokens": 1024,
"temperature": 0.7
},
"stream": true,
"tool_execution": { "mode": "server" },
"tool_limits": {
"max_llm_calls": 10,
"max_tool_calls_per_step": 5
},
"bindings": [...]
}
}
| Field | Description |
|---|---|
request |
Standard responses request (model, system, input, tools, etc.) |
stream |
Enable streaming for real-time output deltas |
tool_execution |
Tool execution mode: server (auto-execute) or client (wait for results) |
tool_limits |
Limits on tool execution loops |
bindings |
Inject upstream outputs into the request |
join.all
Synchronization point that waits for all upstream nodes to complete before allowing downstream nodes to start.
{
"id": "sync",
"type": "join.all"
}
Use join.all when you need to collect results from parallel branches before continuing.
transform.json
Transforms JSON outputs from upstream nodes into a new structure.
{
"id": "combiner",
"type": "transform.json",
"input": {
"object": {
"analysis": { "from": "analyzer", "pointer": "/content" },
"summary": { "from": "summarizer", "pointer": "/content" }
}
}
}
| Field | Description |
|---|---|
object |
Build a new object from upstream outputs |
merge |
Merge multiple upstream outputs |
The pointer field uses JSON Pointer syntax (RFC 6901) to extract specific values.
Edges
Edges define dependencies between nodes. If node B depends on node A’s output, add an edge:
{
"edges": [
{ "from": "analyzer", "to": "summarizer" }
]
}
Implicit edges: When using bindings in an llm.responses node, edges are inferred automatically from the binding sources.
Explicit edges: Use for join.all nodes or to enforce execution order without data flow.
Outputs
Outputs define what values to extract from completed nodes:
{
"outputs": [
{ "name": "result", "from": "analyzer" },
{ "name": "summary", "from": "summarizer", "pointer": "/content" }
]
}
| Field | Description |
|---|---|
name |
Key for this output in the final result |
from |
Node ID to extract from |
pointer |
Optional JSON Pointer to extract specific value |
Bindings
Bindings inject upstream node outputs into downstream request templates:
{
"id": "synthesizer",
"type": "llm.responses",
"input": {
"request": {
"model": "claude-sonnet-4-20250514",
"system": "Synthesize the following analyses.",
"input": [
{ "role": "user", "content": "Analysis A: {{analysis_a}}\n\nAnalysis B: {{analysis_b}}" }
]
},
"bindings": [
{ "from": "agent_a", "to": "analysis_a" },
{ "from": "agent_b", "to": "analysis_b", "pointer": "/content" }
]
}
}
| Field | Description |
|---|---|
from |
Source node ID |
to |
Placeholder name in the request (use {{name}} in content) |
pointer |
Optional JSON Pointer for nested extraction |
encoding |
Optional: json (default) or json_string |
Multi-Agent Patterns
Parallel Analysis
Run multiple specialized agents in parallel, then synthesize results:
flowchart LR
subgraph parallel["Parallel Execution"]
T[Technical Analyst]
B[Business Analyst]
R[Risk Analyst]
end
T --> J[Join]
B --> J
R --> J
J --> S[Synthesizer]
S --> O[Final Report]
const spec = {
kind: WorkflowKinds.WorkflowV0,
name: "parallel-analysis",
execution: { max_parallelism: 3 },
nodes: [
// Three parallel agents
{
id: "technical",
type: WorkflowNodeTypes.LLMResponses,
input: {
request: {
model: "claude-sonnet-4-20250514",
system: "You are a technical analyst. Focus on implementation details.",
input: [{ role: "user", content: "Analyze: {{topic}}" }],
},
},
},
{
id: "business",
type: WorkflowNodeTypes.LLMResponses,
input: {
request: {
model: "claude-sonnet-4-20250514",
system: "You are a business analyst. Focus on market impact.",
input: [{ role: "user", content: "Analyze: {{topic}}" }],
},
},
},
{
id: "risk",
type: WorkflowNodeTypes.LLMResponses,
input: {
request: {
model: "claude-sonnet-4-20250514",
system: "You are a risk analyst. Focus on potential issues.",
input: [{ role: "user", content: "Analyze: {{topic}}" }],
},
},
},
// Synchronization point
{
id: "join",
type: WorkflowNodeTypes.JoinAll,
},
// Synthesis agent that combines all analyses
{
id: "synthesizer",
type: WorkflowNodeTypes.LLMResponses,
input: {
request: {
model: "claude-sonnet-4-20250514",
system: "Synthesize these analyses into a comprehensive report.",
input: [{
role: "user",
content: "Technical: {{technical}}\n\nBusiness: {{business}}\n\nRisk: {{risk}}",
}],
},
bindings: [
{ from: "technical", to: "technical" },
{ from: "business", to: "business" },
{ from: "risk", to: "risk" },
],
},
},
],
edges: [
{ from: "technical", to: "join" },
{ from: "business", to: "join" },
{ from: "risk", to: "join" },
{ from: "join", to: "synthesizer" },
],
outputs: [
{ name: "report", from: "synthesizer" },
],
};
spec := sdk.WorkflowV0().
Name("parallel-analysis").
Execution(sdk.ExecutionConfig{MaxParallelism: 3}).
// Three parallel agents
Node(sdk.LLMResponsesNode("technical", sdk.LLMResponsesInput{
Request: sdk.ResponsesRequest{
Model: sdk.NewModelID("claude-sonnet-4-20250514"),
System: "You are a technical analyst. Focus on implementation details.",
Input: []sdk.InputItem{{Role: "user", Content: "Analyze: {{topic}}"}},
},
})).
Node(sdk.LLMResponsesNode("business", sdk.LLMResponsesInput{
Request: sdk.ResponsesRequest{
Model: sdk.NewModelID("claude-sonnet-4-20250514"),
System: "You are a business analyst. Focus on market impact.",
Input: []sdk.InputItem{{Role: "user", Content: "Analyze: {{topic}}"}},
},
})).
Node(sdk.LLMResponsesNode("risk", sdk.LLMResponsesInput{
Request: sdk.ResponsesRequest{
Model: sdk.NewModelID("claude-sonnet-4-20250514"),
System: "You are a risk analyst. Focus on potential issues.",
Input: []sdk.InputItem{{Role: "user", Content: "Analyze: {{topic}}"}},
},
})).
// Synchronization point
Node(sdk.JoinAllNode("join")).
// Synthesis agent
Node(sdk.LLMResponsesNodeWithBindings("synthesizer", sdk.LLMResponsesInput{
Request: sdk.ResponsesRequest{
Model: sdk.NewModelID("claude-sonnet-4-20250514"),
System: "Synthesize these analyses into a comprehensive report.",
Input: []sdk.InputItem{{Role: "user", Content: "Technical: {{technical}}\n\nBusiness: {{business}}\n\nRisk: {{risk}}"}},
},
}, []sdk.Binding{
{From: "technical", To: "technical"},
{From: "business", To: "business"},
{From: "risk", To: "risk"},
})).
// Edges
Edge("technical", "join").
Edge("business", "join").
Edge("risk", "join").
Edge("join", "synthesizer").
// Output
Output("report", "synthesizer").
Build()
use modelrelay::WorkflowBuilder;
let spec = WorkflowBuilder::new()
.name("parallel-analysis")
.max_parallelism(3)
// Three parallel agents
.llm_responses_node("technical", |node| {
node.model("claude-sonnet-4-20250514")
.system("You are a technical analyst. Focus on implementation details.")
.user("Analyze: {{topic}}")
})
.llm_responses_node("business", |node| {
node.model("claude-sonnet-4-20250514")
.system("You are a business analyst. Focus on market impact.")
.user("Analyze: {{topic}}")
})
.llm_responses_node("risk", |node| {
node.model("claude-sonnet-4-20250514")
.system("You are a risk analyst. Focus on potential issues.")
.user("Analyze: {{topic}}")
})
// Synchronization point
.join_all_node("join")
// Synthesis agent
.llm_responses_node_with_bindings(
"synthesizer",
|node| {
node.model("claude-sonnet-4-20250514")
.system("Synthesize these analyses into a comprehensive report.")
.user("Technical: {{technical}}\n\nBusiness: {{business}}\n\nRisk: {{risk}}")
},
vec![
("technical", "technical"),
("business", "business"),
("risk", "risk"),
],
)
// Edges
.edge("technical", "join")
.edge("business", "join")
.edge("risk", "join")
.edge("join", "synthesizer")
// Output
.output("report", "synthesizer")
.build();
Monitoring Runs
Event Streaming
Stream events to monitor run progress in real-time:
const { run_id } = await mr.runs.create(spec);
// Stream all events
for await (const event of await mr.runs.events(run_id)) {
switch (event.type) {
case "run_started":
console.log("Run started");
break;
case "node_started":
console.log(`Node ${event.node_id} started`);
break;
case "node_output_delta":
// Real-time text streaming from LLM nodes
if (event.delta.text_delta) {
process.stdout.write(event.delta.text_delta);
}
break;
case "node_llm_call":
console.log(`LLM call: ${event.llm_call.model}, usage: ${JSON.stringify(event.llm_call.usage)}`);
break;
case "node_tool_call":
console.log(`Tool call: ${event.tool_call.tool_call.name}`);
break;
case "node_succeeded":
console.log(`Node ${event.node_id} succeeded`);
break;
case "node_failed":
console.error(`Node ${event.node_id} failed: ${event.error.message}`);
break;
case "run_completed":
console.log("Run completed!");
break;
case "run_failed":
console.error(`Run failed: ${event.error.message}`);
break;
}
}
result, _ := client.Runs.Create(ctx, spec)
stream, _ := client.Runs.StreamEvents(ctx, result.RunID)
defer stream.Close()
for {
event, ok, err := stream.Next()
if err != nil {
log.Fatal(err)
}
if !ok {
break
}
switch ev := event.(type) {
case *sdk.RunStartedEvent:
fmt.Println("Run started")
case *sdk.NodeStartedEvent:
fmt.Printf("Node %s started\n", ev.NodeID)
case *sdk.NodeOutputDeltaEvent:
if ev.Delta.TextDelta != "" {
fmt.Print(ev.Delta.TextDelta)
}
case *sdk.NodeLLMCallEvent:
fmt.Printf("LLM call: %s, usage: %+v\n", ev.LLMCall.Model, ev.LLMCall.Usage)
case *sdk.NodeToolCallEvent:
fmt.Printf("Tool call: %s\n", ev.ToolCall.ToolCall.Name)
case *sdk.NodeSucceededEvent:
fmt.Printf("Node %s succeeded\n", ev.NodeID)
case *sdk.NodeFailedEvent:
fmt.Printf("Node %s failed: %s\n", ev.NodeID, ev.Error.Message)
case *sdk.RunCompletedEvent:
fmt.Println("Run completed!")
case *sdk.RunFailedEvent:
fmt.Printf("Run failed: %s\n", ev.Error.Message)
}
}
use futures_util::StreamExt;
use modelrelay::RunEvent;
let result = client.runs().create(&spec).await?;
let mut stream = client.runs().stream_events(&result.run_id).await?;
while let Some(event) = stream.next().await {
let event = event?;
match event {
RunEvent::RunStarted { .. } => {
println!("Run started");
}
RunEvent::NodeStarted { node_id, .. } => {
println!("Node {} started", node_id);
}
RunEvent::NodeOutputDelta { delta, .. } => {
if let Some(text) = delta.text_delta {
print!("{}", text);
}
}
RunEvent::NodeLLMCall { llm_call, .. } => {
println!("LLM call: {}, usage: {:?}", llm_call.model, llm_call.usage);
}
RunEvent::NodeToolCall { tool_call, .. } => {
println!("Tool call: {}", tool_call.tool_call.name);
}
RunEvent::NodeSucceeded { node_id, .. } => {
println!("Node {} succeeded", node_id);
}
RunEvent::NodeFailed { node_id, error, .. } => {
eprintln!("Node {} failed: {}", node_id, error.message);
}
RunEvent::RunCompleted { .. } => {
println!("Run completed!");
}
RunEvent::RunFailed { error, .. } => {
eprintln!("Run failed: {}", error.message);
}
_ => {}
}
}
Event Types
| Event | Description |
|---|---|
run_compiled |
Workflow spec validated and execution plan created |
run_started |
Run execution began |
run_completed |
Run finished successfully, outputs available |
run_failed |
Run failed with error |
run_canceled |
Run was canceled |
node_started |
Node execution began |
node_succeeded |
Node completed successfully |
node_failed |
Node failed with error |
node_llm_call |
LLM API call completed (includes usage) |
node_tool_call |
Tool call initiated |
node_tool_result |
Tool execution result received |
node_waiting |
Node waiting for client-side tool results |
node_output_delta |
Streaming text delta from LLM |
node_output |
Node output artifact available |
Get Run Status
Poll run status without streaming:
const run = await mr.runs.get(run_id);
console.log("Status:", run.status); // running | succeeded | failed | canceled
console.log("Outputs:", run.outputs);
console.log("Cost:", run.cost);
// Check individual node status
for (const node of run.nodes ?? []) {
console.log(`${node.id}: ${node.status}`);
}
run, err := client.Runs.Get(ctx, runID)
if err != nil {
log.Fatal(err)
}
fmt.Println("Status:", run.Status)
fmt.Println("Outputs:", run.Outputs)
fmt.Println("Cost:", run.Cost)
for _, node := range run.Nodes {
fmt.Printf("%s: %s\n", node.ID, node.Status)
}
let run = client.runs().get(&run_id).await?;
println!("Status: {:?}", run.status); // Running | Succeeded | Failed | Canceled
println!("Outputs: {:?}", run.outputs);
println!("Cost: {:?}", run.cost);
// Check individual node status
if let Some(nodes) = &run.nodes {
for node in nodes {
println!("{}: {:?}", node.id, node.status);
}
}
Tool Execution Modes
Server-Side (Default)
Tools execute automatically on the server. The workflow engine handles the tool loop internally.
{
"tool_execution": { "mode": "server" }
}
Client-Side
Tools wait for your code to provide results. Use this when tools require local resources or user interaction.
const spec = {
kind: WorkflowKinds.WorkflowV0,
nodes: [
{
id: "agent",
type: WorkflowNodeTypes.LLMResponses,
input: {
request: {
model: "claude-sonnet-4-20250514",
system: "You can use tools to help users.",
input: [{ role: "user", content: "What's the weather in Tokyo?" }],
tools: [
{
type: "function",
function: {
name: "get_weather",
description: "Get weather for a city",
parameters: {
type: "object",
properties: { city: { type: "string" } },
required: ["city"],
},
},
},
],
},
tool_execution: { mode: "client" },
tool_limits: { wait_ttl_ms: 60000 },
},
},
],
outputs: [{ name: "response", from: "agent" }],
};
const { run_id } = await mr.runs.create(spec);
// Monitor for waiting state
for await (const event of await mr.runs.events(run_id)) {
if (event.type === "node_waiting") {
// Execute tools locally
const toolResults = await Promise.all(
event.waiting.pending_tool_calls.map(async (call) => {
const args = JSON.parse(call.arguments);
// Execute your tool logic
const result = await getWeather(args.city);
return {
tool_call_id: call.tool_call_id,
output: JSON.stringify(result),
};
})
);
// Submit results to continue the run
await mr.runs.submitToolResults(run_id, {
node_id: event.node_id,
tool_results: toolResults,
});
}
if (event.type === "run_completed") {
break;
}
}
spec := sdk.WorkflowV0().
Node(sdk.LLMResponsesNode("agent", sdk.LLMResponsesInput{
Request: sdk.ResponsesRequest{
Model: sdk.NewModelID("claude-sonnet-4-20250514"),
System: "You can use tools to help users.",
Input: []sdk.InputItem{{Role: "user", Content: "What's the weather in Tokyo?"}},
Tools: []sdk.Tool{{
Type: "function",
Function: sdk.ToolFunction{
Name: "get_weather",
Description: "Get weather for a city",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"city": map[string]any{"type": "string"},
},
"required": []string{"city"},
},
},
}},
},
ToolExecution: &sdk.ToolExecution{Mode: "client"},
ToolLimits: &sdk.ToolLimits{WaitTTLMs: 60000},
})).
Output("response", "agent").
Build()
result, _ := client.Runs.Create(ctx, spec)
stream, _ := client.Runs.StreamEvents(ctx, result.RunID)
for {
event, ok, _ := stream.Next()
if !ok {
break
}
if ev, ok := event.(*sdk.NodeWaitingEvent); ok {
var toolResults []sdk.ToolResult
for _, call := range ev.Waiting.PendingToolCalls {
var args struct{ City string }
json.Unmarshal([]byte(call.Arguments), &args)
weather := getWeather(args.City)
toolResults = append(toolResults, sdk.ToolResult{
ToolCallID: call.ToolCallID,
Output: weather,
})
}
client.Runs.SubmitToolResults(ctx, result.RunID, sdk.ToolResultsRequest{
NodeID: ev.NodeID,
ToolResults: toolResults,
})
}
if _, ok := event.(*sdk.RunCompletedEvent); ok {
break
}
}
use futures_util::StreamExt;
use modelrelay::{WorkflowBuilder, RunEvent, ToolResult, ToolResultsRequest};
use serde::Deserialize;
use serde_json::json;
let spec = WorkflowBuilder::new()
.llm_responses_node("agent", |node| {
node.model("claude-sonnet-4-20250514")
.system("You can use tools to help users.")
.user("What's the weather in Tokyo?")
.tool("get_weather", "Get weather for a city", json!({
"type": "object",
"properties": { "city": { "type": "string" } },
"required": ["city"]
}))
.tool_execution_client()
.wait_ttl_ms(60000)
})
.output("response", "agent")
.build();
let result = client.runs().create(&spec).await?;
let mut stream = client.runs().stream_events(&result.run_id).await?;
while let Some(event) = stream.next().await {
let event = event?;
match event {
RunEvent::NodeWaiting { node_id, waiting, .. } => {
#[derive(Deserialize)]
struct Args { city: String }
let mut tool_results = Vec::new();
for call in waiting.pending_tool_calls {
let args: Args = serde_json::from_str(&call.arguments)?;
let weather = get_weather(&args.city).await;
tool_results.push(ToolResult {
tool_call_id: call.tool_call_id,
output: weather,
});
}
client.runs().submit_tool_results(
&result.run_id,
ToolResultsRequest {
node_id,
tool_results,
},
).await?;
}
RunEvent::RunCompleted { .. } => break,
_ => {}
}
}
Idempotency
Use idempotency keys to safely retry requests:
const { run_id } = await mr.runs.create(spec, {
idempotencyKey: "order-123-analysis",
});
If a run with the same idempotency key already exists and completed successfully, the cached result is returned. This prevents duplicate processing when retrying failed requests.
Best Practices
-
Use meaningful node IDs: Names like
technical_analystare clearer thannode1 -
Set appropriate timeouts: Configure
node_timeout_msandrun_timeout_msbased on expected execution time -
Limit parallelism: Don’t set
max_parallelismhigher than necessary; it increases resource usage -
Enable streaming: Set
stream: trueon LLM nodes for real-time progress visibility -
Use join.all for synchronization: When multiple branches need to complete before continuing, use a
join.allnode -
Extract specific values: Use JSON Pointer in bindings and outputs to extract exactly what you need
-
Handle failures gracefully: Monitor
node_failedandrun_failedevents to handle errors appropriately
Next Steps
- Streaming - Real-time response streaming
- Tool Use - Define and use tools with LLMs
- Customer Tokens - Scope runs to customers