Workflows

Workflows let you define multi-step AI pipelines as declarative specs. Instead of writing imperative code to orchestrate multiple LLM calls, you define a directed acyclic graph (DAG) of nodes and let ModelRelay handle execution, parallelism, and state management.

Why Workflows?

flowchart LR
    A[Input] --> B[Node 1]
    A --> C[Node 2]
    B --> D[Node 3]
    C --> D
    D --> E[Output]

Without workflows: You write imperative code to call multiple models, manage intermediate state, handle errors, and coordinate parallel execution.

With workflows: You declare what you want (nodes, connections, outputs) and the runtime handles how to execute it efficiently.

Benefits:

  • Parallel execution: Independent nodes run concurrently up to max_parallelism
  • Automatic retries: Failed nodes can be retried with exponential backoff
  • Event streaming: Monitor progress in real-time via event stream
  • Idempotent runs: Replay requests return cached results
  • Cost tracking: Aggregated usage across all nodes

Quick Start

Define a workflow spec and run it:

import { ModelRelay, WorkflowKinds, WorkflowNodeTypes } from "@modelrelay/sdk";

const mr = ModelRelay.fromSecretKey(process.env.MODELRELAY_API_KEY!);

// Define a simple workflow with one LLM node
const spec = {
  kind: WorkflowKinds.WorkflowV0,
  name: "greeting-workflow",
  nodes: [
    {
      id: "greeter",
      type: WorkflowNodeTypes.LLMResponses,
      input: {
        request: {
          model: "claude-sonnet-4-20250514",
          system: "You are a friendly assistant.",
          input: [{ role: "user", content: "Say hello!" }],
        },
      },
    },
  ],
  outputs: [
    { name: "greeting", from: "greeter" },
  ],
};

// Start the run
const { run_id } = await mr.runs.create(spec);

// Stream events until completion
for await (const event of await mr.runs.events(run_id)) {
  if (event.type === "node_output_delta" && event.delta.text_delta) {
    process.stdout.write(event.delta.text_delta);
  }
  if (event.type === "run_completed") {
    console.log("\nRun completed!");
  }
}
ctx := context.Background()

// Define a simple workflow with one LLM node
spec := sdk.WorkflowV0().
    Name("greeting-workflow").
    Node(sdk.LLMResponsesNode("greeter", sdk.LLMResponsesInput{
        Request: sdk.ResponsesRequest{
            Model:  sdk.NewModelID("claude-sonnet-4-20250514"),
            System: "You are a friendly assistant.",
            Input:  []sdk.InputItem{{Role: "user", Content: "Say hello!"}},
        },
    })).
    Output("greeting", "greeter").
    Build()

// Start the run
result, err := client.Runs.Create(ctx, spec)
if err != nil {
    log.Fatal(err)
}

// Stream events until completion
stream, err := client.Runs.StreamEvents(ctx, result.RunID)
if err != nil {
    log.Fatal(err)
}
defer stream.Close()

for {
    event, ok, err := stream.Next()
    if err != nil {
        log.Fatal(err)
    }
    if !ok {
        break
    }
    switch ev := event.(type) {
    case *sdk.NodeOutputDeltaEvent:
        if ev.Delta.TextDelta != "" {
            fmt.Print(ev.Delta.TextDelta)
        }
    case *sdk.RunCompletedEvent:
        fmt.Println("\nRun completed!")
    }
}
use futures_util::StreamExt;
use modelrelay::{Client, WorkflowBuilder, RunEvent};

let client = Client::from_api_key(std::env::var("MODELRELAY_API_KEY")?)?
    .build()?;

// Define a simple workflow with one LLM node
let spec = WorkflowBuilder::new()
    .name("greeting-workflow")
    .llm_responses_node("greeter", |node| {
        node.model("claude-sonnet-4-20250514")
            .system("You are a friendly assistant.")
            .user("Say hello!")
    })
    .output("greeting", "greeter")
    .build();

// Start the run
let result = client.runs().create(&spec).await?;

// Stream events until completion
let mut stream = client.runs().stream_events(&result.run_id).await?;

while let Some(event) = stream.next().await {
    let event = event?;
    match event {
        RunEvent::NodeOutputDelta { delta, .. } => {
            if let Some(text) = delta.text_delta {
                print!("{}", text);
            }
        }
        RunEvent::RunCompleted { .. } => {
            println!("\nRun completed!");
        }
        _ => {}
    }
}

Workflow Spec Format

A workflow spec is a JSON document with this structure:

{
  "kind": "workflow.v0",
  "name": "my-workflow",
  "execution": {
    "max_parallelism": 4,
    "node_timeout_ms": 60000,
    "run_timeout_ms": 300000
  },
  "nodes": [...],
  "edges": [...],
  "outputs": [...]
}
Field Type Description
kind string Must be "workflow.v0"
name string Optional workflow name for identification
execution object Optional execution configuration
nodes array List of nodes (required)
edges array Explicit edges between nodes (optional)
outputs array Values to extract from completed nodes (required)

Execution Config

Field Default Description
max_parallelism 8 Maximum concurrent node executions
node_timeout_ms 60000 Timeout for individual node execution
run_timeout_ms 300000 Timeout for entire workflow run

Node Types

llm.responses

Makes an LLM API call. This is the primary node type for AI inference.

{
  "id": "analyzer",
  "type": "llm.responses",
  "input": {
    "request": {
      "model": "claude-sonnet-4-20250514",
      "system": "You are a helpful assistant.",
      "input": [
        { "role": "user", "content": "Analyze this data..." }
      ],
      "max_output_tokens": 1024,
      "temperature": 0.7
    },
    "stream": true,
    "tool_execution": { "mode": "server" },
    "tool_limits": {
      "max_llm_calls": 10,
      "max_tool_calls_per_step": 5
    },
    "bindings": [...]
  }
}
Field Description
request Standard responses request (model, system, input, tools, etc.)
stream Enable streaming for real-time output deltas
tool_execution Tool execution mode: server (auto-execute) or client (wait for results)
tool_limits Limits on tool execution loops
bindings Inject upstream outputs into the request

join.all

Synchronization point that waits for all upstream nodes to complete before allowing downstream nodes to start.

{
  "id": "sync",
  "type": "join.all"
}

Use join.all when you need to collect results from parallel branches before continuing.

transform.json

Transforms JSON outputs from upstream nodes into a new structure.

{
  "id": "combiner",
  "type": "transform.json",
  "input": {
    "object": {
      "analysis": { "from": "analyzer", "pointer": "/content" },
      "summary": { "from": "summarizer", "pointer": "/content" }
    }
  }
}
Field Description
object Build a new object from upstream outputs
merge Merge multiple upstream outputs

The pointer field uses JSON Pointer syntax (RFC 6901) to extract specific values.

Edges

Edges define dependencies between nodes. If node B depends on node A’s output, add an edge:

{
  "edges": [
    { "from": "analyzer", "to": "summarizer" }
  ]
}

Implicit edges: When using bindings in an llm.responses node, edges are inferred automatically from the binding sources.

Explicit edges: Use for join.all nodes or to enforce execution order without data flow.

Outputs

Outputs define what values to extract from completed nodes:

{
  "outputs": [
    { "name": "result", "from": "analyzer" },
    { "name": "summary", "from": "summarizer", "pointer": "/content" }
  ]
}
Field Description
name Key for this output in the final result
from Node ID to extract from
pointer Optional JSON Pointer to extract specific value

Bindings

Bindings inject upstream node outputs into downstream request templates:

{
  "id": "synthesizer",
  "type": "llm.responses",
  "input": {
    "request": {
      "model": "claude-sonnet-4-20250514",
      "system": "Synthesize the following analyses.",
      "input": [
        { "role": "user", "content": "Analysis A: {{analysis_a}}\n\nAnalysis B: {{analysis_b}}" }
      ]
    },
    "bindings": [
      { "from": "agent_a", "to": "analysis_a" },
      { "from": "agent_b", "to": "analysis_b", "pointer": "/content" }
    ]
  }
}
Field Description
from Source node ID
to Placeholder name in the request (use {{name}} in content)
pointer Optional JSON Pointer for nested extraction
encoding Optional: json (default) or json_string

Multi-Agent Patterns

Parallel Analysis

Run multiple specialized agents in parallel, then synthesize results:

flowchart LR
    subgraph parallel["Parallel Execution"]
        T[Technical Analyst]
        B[Business Analyst]
        R[Risk Analyst]
    end
    T --> J[Join]
    B --> J
    R --> J
    J --> S[Synthesizer]
    S --> O[Final Report]
const spec = {
  kind: WorkflowKinds.WorkflowV0,
  name: "parallel-analysis",
  execution: { max_parallelism: 3 },
  nodes: [
    // Three parallel agents
    {
      id: "technical",
      type: WorkflowNodeTypes.LLMResponses,
      input: {
        request: {
          model: "claude-sonnet-4-20250514",
          system: "You are a technical analyst. Focus on implementation details.",
          input: [{ role: "user", content: "Analyze: {{topic}}" }],
        },
      },
    },
    {
      id: "business",
      type: WorkflowNodeTypes.LLMResponses,
      input: {
        request: {
          model: "claude-sonnet-4-20250514",
          system: "You are a business analyst. Focus on market impact.",
          input: [{ role: "user", content: "Analyze: {{topic}}" }],
        },
      },
    },
    {
      id: "risk",
      type: WorkflowNodeTypes.LLMResponses,
      input: {
        request: {
          model: "claude-sonnet-4-20250514",
          system: "You are a risk analyst. Focus on potential issues.",
          input: [{ role: "user", content: "Analyze: {{topic}}" }],
        },
      },
    },
    // Synchronization point
    {
      id: "join",
      type: WorkflowNodeTypes.JoinAll,
    },
    // Synthesis agent that combines all analyses
    {
      id: "synthesizer",
      type: WorkflowNodeTypes.LLMResponses,
      input: {
        request: {
          model: "claude-sonnet-4-20250514",
          system: "Synthesize these analyses into a comprehensive report.",
          input: [{
            role: "user",
            content: "Technical: {{technical}}\n\nBusiness: {{business}}\n\nRisk: {{risk}}",
          }],
        },
        bindings: [
          { from: "technical", to: "technical" },
          { from: "business", to: "business" },
          { from: "risk", to: "risk" },
        ],
      },
    },
  ],
  edges: [
    { from: "technical", to: "join" },
    { from: "business", to: "join" },
    { from: "risk", to: "join" },
    { from: "join", to: "synthesizer" },
  ],
  outputs: [
    { name: "report", from: "synthesizer" },
  ],
};
spec := sdk.WorkflowV0().
    Name("parallel-analysis").
    Execution(sdk.ExecutionConfig{MaxParallelism: 3}).
    // Three parallel agents
    Node(sdk.LLMResponsesNode("technical", sdk.LLMResponsesInput{
        Request: sdk.ResponsesRequest{
            Model:  sdk.NewModelID("claude-sonnet-4-20250514"),
            System: "You are a technical analyst. Focus on implementation details.",
            Input:  []sdk.InputItem{{Role: "user", Content: "Analyze: {{topic}}"}},
        },
    })).
    Node(sdk.LLMResponsesNode("business", sdk.LLMResponsesInput{
        Request: sdk.ResponsesRequest{
            Model:  sdk.NewModelID("claude-sonnet-4-20250514"),
            System: "You are a business analyst. Focus on market impact.",
            Input:  []sdk.InputItem{{Role: "user", Content: "Analyze: {{topic}}"}},
        },
    })).
    Node(sdk.LLMResponsesNode("risk", sdk.LLMResponsesInput{
        Request: sdk.ResponsesRequest{
            Model:  sdk.NewModelID("claude-sonnet-4-20250514"),
            System: "You are a risk analyst. Focus on potential issues.",
            Input:  []sdk.InputItem{{Role: "user", Content: "Analyze: {{topic}}"}},
        },
    })).
    // Synchronization point
    Node(sdk.JoinAllNode("join")).
    // Synthesis agent
    Node(sdk.LLMResponsesNodeWithBindings("synthesizer", sdk.LLMResponsesInput{
        Request: sdk.ResponsesRequest{
            Model:  sdk.NewModelID("claude-sonnet-4-20250514"),
            System: "Synthesize these analyses into a comprehensive report.",
            Input:  []sdk.InputItem{{Role: "user", Content: "Technical: {{technical}}\n\nBusiness: {{business}}\n\nRisk: {{risk}}"}},
        },
    }, []sdk.Binding{
        {From: "technical", To: "technical"},
        {From: "business", To: "business"},
        {From: "risk", To: "risk"},
    })).
    // Edges
    Edge("technical", "join").
    Edge("business", "join").
    Edge("risk", "join").
    Edge("join", "synthesizer").
    // Output
    Output("report", "synthesizer").
    Build()
use modelrelay::WorkflowBuilder;

let spec = WorkflowBuilder::new()
    .name("parallel-analysis")
    .max_parallelism(3)
    // Three parallel agents
    .llm_responses_node("technical", |node| {
        node.model("claude-sonnet-4-20250514")
            .system("You are a technical analyst. Focus on implementation details.")
            .user("Analyze: {{topic}}")
    })
    .llm_responses_node("business", |node| {
        node.model("claude-sonnet-4-20250514")
            .system("You are a business analyst. Focus on market impact.")
            .user("Analyze: {{topic}}")
    })
    .llm_responses_node("risk", |node| {
        node.model("claude-sonnet-4-20250514")
            .system("You are a risk analyst. Focus on potential issues.")
            .user("Analyze: {{topic}}")
    })
    // Synchronization point
    .join_all_node("join")
    // Synthesis agent
    .llm_responses_node_with_bindings(
        "synthesizer",
        |node| {
            node.model("claude-sonnet-4-20250514")
                .system("Synthesize these analyses into a comprehensive report.")
                .user("Technical: {{technical}}\n\nBusiness: {{business}}\n\nRisk: {{risk}}")
        },
        vec![
            ("technical", "technical"),
            ("business", "business"),
            ("risk", "risk"),
        ],
    )
    // Edges
    .edge("technical", "join")
    .edge("business", "join")
    .edge("risk", "join")
    .edge("join", "synthesizer")
    // Output
    .output("report", "synthesizer")
    .build();

Monitoring Runs

Event Streaming

Stream events to monitor run progress in real-time:

const { run_id } = await mr.runs.create(spec);

// Stream all events
for await (const event of await mr.runs.events(run_id)) {
  switch (event.type) {
    case "run_started":
      console.log("Run started");
      break;

    case "node_started":
      console.log(`Node ${event.node_id} started`);
      break;

    case "node_output_delta":
      // Real-time text streaming from LLM nodes
      if (event.delta.text_delta) {
        process.stdout.write(event.delta.text_delta);
      }
      break;

    case "node_llm_call":
      console.log(`LLM call: ${event.llm_call.model}, usage: ${JSON.stringify(event.llm_call.usage)}`);
      break;

    case "node_tool_call":
      console.log(`Tool call: ${event.tool_call.tool_call.name}`);
      break;

    case "node_succeeded":
      console.log(`Node ${event.node_id} succeeded`);
      break;

    case "node_failed":
      console.error(`Node ${event.node_id} failed: ${event.error.message}`);
      break;

    case "run_completed":
      console.log("Run completed!");
      break;

    case "run_failed":
      console.error(`Run failed: ${event.error.message}`);
      break;
  }
}
result, _ := client.Runs.Create(ctx, spec)

stream, _ := client.Runs.StreamEvents(ctx, result.RunID)
defer stream.Close()

for {
    event, ok, err := stream.Next()
    if err != nil {
        log.Fatal(err)
    }
    if !ok {
        break
    }

    switch ev := event.(type) {
    case *sdk.RunStartedEvent:
        fmt.Println("Run started")

    case *sdk.NodeStartedEvent:
        fmt.Printf("Node %s started\n", ev.NodeID)

    case *sdk.NodeOutputDeltaEvent:
        if ev.Delta.TextDelta != "" {
            fmt.Print(ev.Delta.TextDelta)
        }

    case *sdk.NodeLLMCallEvent:
        fmt.Printf("LLM call: %s, usage: %+v\n", ev.LLMCall.Model, ev.LLMCall.Usage)

    case *sdk.NodeToolCallEvent:
        fmt.Printf("Tool call: %s\n", ev.ToolCall.ToolCall.Name)

    case *sdk.NodeSucceededEvent:
        fmt.Printf("Node %s succeeded\n", ev.NodeID)

    case *sdk.NodeFailedEvent:
        fmt.Printf("Node %s failed: %s\n", ev.NodeID, ev.Error.Message)

    case *sdk.RunCompletedEvent:
        fmt.Println("Run completed!")

    case *sdk.RunFailedEvent:
        fmt.Printf("Run failed: %s\n", ev.Error.Message)
    }
}
use futures_util::StreamExt;
use modelrelay::RunEvent;

let result = client.runs().create(&spec).await?;

let mut stream = client.runs().stream_events(&result.run_id).await?;

while let Some(event) = stream.next().await {
    let event = event?;
    match event {
        RunEvent::RunStarted { .. } => {
            println!("Run started");
        }
        RunEvent::NodeStarted { node_id, .. } => {
            println!("Node {} started", node_id);
        }
        RunEvent::NodeOutputDelta { delta, .. } => {
            if let Some(text) = delta.text_delta {
                print!("{}", text);
            }
        }
        RunEvent::NodeLLMCall { llm_call, .. } => {
            println!("LLM call: {}, usage: {:?}", llm_call.model, llm_call.usage);
        }
        RunEvent::NodeToolCall { tool_call, .. } => {
            println!("Tool call: {}", tool_call.tool_call.name);
        }
        RunEvent::NodeSucceeded { node_id, .. } => {
            println!("Node {} succeeded", node_id);
        }
        RunEvent::NodeFailed { node_id, error, .. } => {
            eprintln!("Node {} failed: {}", node_id, error.message);
        }
        RunEvent::RunCompleted { .. } => {
            println!("Run completed!");
        }
        RunEvent::RunFailed { error, .. } => {
            eprintln!("Run failed: {}", error.message);
        }
        _ => {}
    }
}

Event Types

Event Description
run_compiled Workflow spec validated and execution plan created
run_started Run execution began
run_completed Run finished successfully, outputs available
run_failed Run failed with error
run_canceled Run was canceled
node_started Node execution began
node_succeeded Node completed successfully
node_failed Node failed with error
node_llm_call LLM API call completed (includes usage)
node_tool_call Tool call initiated
node_tool_result Tool execution result received
node_waiting Node waiting for client-side tool results
node_output_delta Streaming text delta from LLM
node_output Node output artifact available

Get Run Status

Poll run status without streaming:

const run = await mr.runs.get(run_id);

console.log("Status:", run.status);  // running | succeeded | failed | canceled
console.log("Outputs:", run.outputs);
console.log("Cost:", run.cost);

// Check individual node status
for (const node of run.nodes ?? []) {
  console.log(`${node.id}: ${node.status}`);
}
run, err := client.Runs.Get(ctx, runID)
if err != nil {
    log.Fatal(err)
}

fmt.Println("Status:", run.Status)
fmt.Println("Outputs:", run.Outputs)
fmt.Println("Cost:", run.Cost)

for _, node := range run.Nodes {
    fmt.Printf("%s: %s\n", node.ID, node.Status)
}
let run = client.runs().get(&run_id).await?;

println!("Status: {:?}", run.status);  // Running | Succeeded | Failed | Canceled
println!("Outputs: {:?}", run.outputs);
println!("Cost: {:?}", run.cost);

// Check individual node status
if let Some(nodes) = &run.nodes {
    for node in nodes {
        println!("{}: {:?}", node.id, node.status);
    }
}

Tool Execution Modes

Server-Side (Default)

Tools execute automatically on the server. The workflow engine handles the tool loop internally.

{
  "tool_execution": { "mode": "server" }
}

Client-Side

Tools wait for your code to provide results. Use this when tools require local resources or user interaction.

const spec = {
  kind: WorkflowKinds.WorkflowV0,
  nodes: [
    {
      id: "agent",
      type: WorkflowNodeTypes.LLMResponses,
      input: {
        request: {
          model: "claude-sonnet-4-20250514",
          system: "You can use tools to help users.",
          input: [{ role: "user", content: "What's the weather in Tokyo?" }],
          tools: [
            {
              type: "function",
              function: {
                name: "get_weather",
                description: "Get weather for a city",
                parameters: {
                  type: "object",
                  properties: { city: { type: "string" } },
                  required: ["city"],
                },
              },
            },
          ],
        },
        tool_execution: { mode: "client" },
        tool_limits: { wait_ttl_ms: 60000 },
      },
    },
  ],
  outputs: [{ name: "response", from: "agent" }],
};

const { run_id } = await mr.runs.create(spec);

// Monitor for waiting state
for await (const event of await mr.runs.events(run_id)) {
  if (event.type === "node_waiting") {
    // Execute tools locally
    const toolResults = await Promise.all(
      event.waiting.pending_tool_calls.map(async (call) => {
        const args = JSON.parse(call.arguments);
        // Execute your tool logic
        const result = await getWeather(args.city);
        return {
          tool_call_id: call.tool_call_id,
          output: JSON.stringify(result),
        };
      })
    );

    // Submit results to continue the run
    await mr.runs.submitToolResults(run_id, {
      node_id: event.node_id,
      tool_results: toolResults,
    });
  }

  if (event.type === "run_completed") {
    break;
  }
}
spec := sdk.WorkflowV0().
    Node(sdk.LLMResponsesNode("agent", sdk.LLMResponsesInput{
        Request: sdk.ResponsesRequest{
            Model:  sdk.NewModelID("claude-sonnet-4-20250514"),
            System: "You can use tools to help users.",
            Input:  []sdk.InputItem{{Role: "user", Content: "What's the weather in Tokyo?"}},
            Tools: []sdk.Tool{{
                Type: "function",
                Function: sdk.ToolFunction{
                    Name:        "get_weather",
                    Description: "Get weather for a city",
                    Parameters: map[string]any{
                        "type": "object",
                        "properties": map[string]any{
                            "city": map[string]any{"type": "string"},
                        },
                        "required": []string{"city"},
                    },
                },
            }},
        },
        ToolExecution: &sdk.ToolExecution{Mode: "client"},
        ToolLimits:    &sdk.ToolLimits{WaitTTLMs: 60000},
    })).
    Output("response", "agent").
    Build()

result, _ := client.Runs.Create(ctx, spec)
stream, _ := client.Runs.StreamEvents(ctx, result.RunID)

for {
    event, ok, _ := stream.Next()
    if !ok {
        break
    }

    if ev, ok := event.(*sdk.NodeWaitingEvent); ok {
        var toolResults []sdk.ToolResult
        for _, call := range ev.Waiting.PendingToolCalls {
            var args struct{ City string }
            json.Unmarshal([]byte(call.Arguments), &args)

            weather := getWeather(args.City)
            toolResults = append(toolResults, sdk.ToolResult{
                ToolCallID: call.ToolCallID,
                Output:     weather,
            })
        }

        client.Runs.SubmitToolResults(ctx, result.RunID, sdk.ToolResultsRequest{
            NodeID:      ev.NodeID,
            ToolResults: toolResults,
        })
    }

    if _, ok := event.(*sdk.RunCompletedEvent); ok {
        break
    }
}
use futures_util::StreamExt;
use modelrelay::{WorkflowBuilder, RunEvent, ToolResult, ToolResultsRequest};
use serde::Deserialize;
use serde_json::json;

let spec = WorkflowBuilder::new()
    .llm_responses_node("agent", |node| {
        node.model("claude-sonnet-4-20250514")
            .system("You can use tools to help users.")
            .user("What's the weather in Tokyo?")
            .tool("get_weather", "Get weather for a city", json!({
                "type": "object",
                "properties": { "city": { "type": "string" } },
                "required": ["city"]
            }))
            .tool_execution_client()
            .wait_ttl_ms(60000)
    })
    .output("response", "agent")
    .build();

let result = client.runs().create(&spec).await?;
let mut stream = client.runs().stream_events(&result.run_id).await?;

while let Some(event) = stream.next().await {
    let event = event?;
    match event {
        RunEvent::NodeWaiting { node_id, waiting, .. } => {
            #[derive(Deserialize)]
            struct Args { city: String }

            let mut tool_results = Vec::new();
            for call in waiting.pending_tool_calls {
                let args: Args = serde_json::from_str(&call.arguments)?;
                let weather = get_weather(&args.city).await;
                tool_results.push(ToolResult {
                    tool_call_id: call.tool_call_id,
                    output: weather,
                });
            }

            client.runs().submit_tool_results(
                &result.run_id,
                ToolResultsRequest {
                    node_id,
                    tool_results,
                },
            ).await?;
        }
        RunEvent::RunCompleted { .. } => break,
        _ => {}
    }
}

Idempotency

Use idempotency keys to safely retry requests:

const { run_id } = await mr.runs.create(spec, {
  idempotencyKey: "order-123-analysis",
});

If a run with the same idempotency key already exists and completed successfully, the cached result is returned. This prevents duplicate processing when retrying failed requests.

Best Practices

  1. Use meaningful node IDs: Names like technical_analyst are clearer than node1

  2. Set appropriate timeouts: Configure node_timeout_ms and run_timeout_ms based on expected execution time

  3. Limit parallelism: Don’t set max_parallelism higher than necessary; it increases resource usage

  4. Enable streaming: Set stream: true on LLM nodes for real-time progress visibility

  5. Use join.all for synchronization: When multiple branches need to complete before continuing, use a join.all node

  6. Extract specific values: Use JSON Pointer in bindings and outputs to extract exactly what you need

  7. Handle failures gracefully: Monitor node_failed and run_failed events to handle errors appropriately

Next Steps