Workflows

Workflows let you define multi-step AI pipelines as declarative specs. Instead of writing imperative code to orchestrate multiple LLM calls, you define a directed acyclic graph (DAG) of nodes and let ModelRelay handle execution, parallelism, and state management.

ModelRelay’s workflow format uses concise {{placeholders}} for data binding between nodes.

Why Workflows?

flowchart LR
    A[Input] --> B[Node 1]
    A --> C[Node 2]
    B --> D[Node 3]
    C --> D
    D --> E[Output]

Without workflows: You write imperative code to call multiple models, manage intermediate state, handle errors, and coordinate parallel execution.

With workflows: You declare what you want (nodes, dependencies, outputs) and the runtime handles how to execute it efficiently.

Benefits:

  • Parallel execution: Independent nodes run concurrently.
  • Retry handling: The runtime can retry failed steps where appropriate.
  • Event streaming: Monitor progress in real-time via run events.
  • Idempotent runs: Repeat requests can return cached results.
  • Cost tracking: Aggregated usage across all nodes.

Quick Start

Sequential Chain

Chain steps together with automatic edge wiring:

import { ModelRelay, chain, llm } from "@modelrelay/sdk";

const mr = ModelRelay.fromSecretKey(process.env.MODELRELAY_API_KEY!);

const spec = chain([
  llm("summarize", (n) => n.system("Summarize.").user("{{task}}")),
  llm("translate", (n) => n.system("Translate to French.").user("{{summarize}}")),
], { name: "summarize-translate", model: "claude-sonnet-4-5" })
  .output("result", "translate")
  .build();

const { run_id } = await mr.runs.create(spec, {
  input: { task: "Explain how photosynthesis works" },
});
spec, _ := sdk.Chain([]sdk.WorkflowIntentNode{
    sdk.LLM("summarize", func(n sdk.LLMNodeBuilder) sdk.LLMNodeBuilder {
        return n.System("Summarize.").User("{{task}}")
    }),
    sdk.LLM("translate", func(n sdk.LLMNodeBuilder) sdk.LLMNodeBuilder {
        return n.System("Translate to French.").User("{{summarize}}")
    }),
}, sdk.ChainOptions{Name: "summarize-translate", Model: "claude-sonnet-4-5"}).
    Output("result", "translate").
    Build()

created, _ := client.Runs.Create(ctx, spec, sdk.WithRunInputs(map[string]any{
    "task": "Explain how photosynthesis works",
}))
use modelrelay::{chain, llm, ChainOptions, RunsCreateOptions};
use serde_json::json;

let spec = chain(
    vec![
        llm("summarize", |n| n.system("Summarize.").user("{{task}}")),
        llm("translate", |n| n.system("Translate to French.").user("{{summarize}}")),
    ],
    ChainOptions { name: Some("summarize-translate".into()), model: Some("claude-sonnet-4-5".into()), ..Default::default() },
)
.output("result", "translate", None)
.build()?;

let run = client.runs().create(spec, RunsCreateOptions {
    input: Some(json!({ "task": "Explain how photosynthesis works" })),
    ..Default::default()
}).await?;

Parallel with Aggregation

Fan out to multiple agents, then aggregate their outputs:

import { parallel, llm } from "@modelrelay/sdk";

const spec = parallel([
  llm("agent_a", (n) => n.user("Write 3 ideas for {{task}}.")),
  llm("agent_b", (n) => n.user("Write 3 objections for {{task}}.")),
], { name: "multi-agent", model: "claude-sonnet-4-5" })
  .llm("aggregate", (n) => n.system("Synthesize.").user("{{join}}"))
  .edge("join", "aggregate")
  .output("result", "aggregate")
  .build();
spec, _ := sdk.Parallel([]sdk.WorkflowIntentNode{
    sdk.LLM("agent_a", func(n sdk.LLMNodeBuilder) sdk.LLMNodeBuilder {
        return n.User("Write 3 ideas for {{task}}.")
    }),
    sdk.LLM("agent_b", func(n sdk.LLMNodeBuilder) sdk.LLMNodeBuilder {
        return n.User("Write 3 objections for {{task}}.")
    }),
}, sdk.ParallelOptions{Name: "multi-agent", Model: "claude-sonnet-4-5"}).
    LLM("aggregate", func(n sdk.LLMNodeBuilder) sdk.LLMNodeBuilder {
        return n.System("Synthesize.").User("{{join}}")
    }).
    Edge("join", "aggregate").
    Output("result", "aggregate").
    Build()
use modelrelay::{parallel, llm, ParallelOptions};

let spec = parallel(
    vec![
        llm("agent_a", |n| n.user("Write 3 ideas for {{task}}.")),
        llm("agent_b", |n| n.user("Write 3 objections for {{task}}.")),
    ],
    ParallelOptions { name: Some("multi-agent".into()), model: Some("claude-sonnet-4-5".into()), ..Default::default() },
)
.llm("aggregate", |n| n.system("Synthesize.").user("{{join}}"))
.edge("join", "aggregate")
.output("result", "aggregate", None)
.build()?;

Workflow Spec

Top-Level Fields

  • kind: must be workflow
  • name: optional label
  • model: optional default model for all LLM nodes
  • max_parallelism: optional max concurrent node executions (default: 4)
  • nodes: list of nodes
  • outputs: named outputs returned by the run

Node Types

workflow supports:

  • llm
  • join.all
  • join.any
  • join.collect
  • transform.json
  • map.fanout
  • agent.run
  • agent.scheduler

Placeholders

Use placeholders inside user (and system) strings:

  • {{task}} binds to the run input named task.
  • {{node_id}} or {{node_id.output}} binds to another node’s output.
  • {{item}} is available inside map.fanout subnodes.

Placeholders create implicit dependencies. You can also set explicit dependencies with depends_on on any node.

Map Fan-Out Example

{
  "kind": "workflow",
  "name": "fanout",
  "model": "gpt-5.1",
  "nodes": [
    { "id": "generator", "type": "llm", "user": "Generate 3 subquestions for {{task}}" },
    {
      "id": "fanout",
      "type": "map.fanout",
      "items_from": "generator",
      "items_path": "/questions",
      "subnode": { "id": "answer", "type": "llm", "user": "Answer: {{item}}" },
      "max_parallelism": 4
    },
    { "id": "aggregate", "type": "llm", "user": "Combine: {{fanout.output}}" }
  ],
  "outputs": [{ "name": "final", "from": "aggregate" }]
}

Join & Aggregation Example

{
  "kind": "workflow",
  "name": "multi-agent",
  "model": "claude-sonnet-4-5",
  "nodes": [
    { "id": "agent_a", "type": "llm", "user": "Write 3 ideas for {{task}}." },
    { "id": "agent_b", "type": "llm", "user": "Write 3 objections for {{task}}." },
    { "id": "join", "type": "join.all" },
    { "id": "aggregate", "type": "llm", "user": "Synthesize: {{join}}" }
  ],
  "outputs": [{ "name": "result", "from": "aggregate" }]
}

Agent Scheduler

The agent.scheduler node enables multi-agent coordination loops within a workflow. While standard workflow nodes execute in a fixed DAG pattern, the scheduler can run multiple rounds of agent interactions with message-based coordination.

flowchart LR
    WF[Workflow DAG] --> SCH[agent.scheduler]
    SCH -->|spawn| A1[Agent 1]
    SCH -->|spawn| A2[Agent 2]
    A1 -->|messages| SCH
    A2 -->|messages| SCH
    SCH -->|coordinate| C[Coordinator]
    C -->|decision| SCH

When to Use

Use agent.scheduler when you need:

  • Multi-round coordination: Agents iterate until consensus or completion
  • Dynamic message routing: Coordinator decides who to involve based on context
  • Bounded loops: Policy constraints prevent runaway execution

For fixed pipelines (A→B→C), use standard workflow nodes instead.

Scheduler Spec

{
  "id": "review_loop",
  "type": "agent.scheduler",
  "agents": [
    {"agent": "coder@1", "count": 2, "input": [{"role": "user", "content": "Implement feature X"}]},
    {"agent": "reviewer@1", "count": 1, "input": [{"role": "user", "content": "Review implementations"}]}
  ],
  "coordinator": {
    "agent": "lead@1",
    "input": [{"role": "user", "content": "Coordinate the review process"}]
  },
  "policy": {
    "max_rounds": 5,
    "max_concurrency": 3,
    "max_steps": 50,
    "timeout_ms": 120000
  }
}

Scheduler Fields

Field Type Description
agents array Participant agents to spawn (agent slug, count, input)
coordinator object Optional coordinator agent that makes routing decisions
policy.max_rounds int Maximum coordination rounds
policy.max_concurrency int Maximum concurrent agent runs
policy.max_steps int Total step limit across all agents
policy.timeout_ms int Overall timeout in milliseconds

How It Works

  1. Spawn phase: Scheduler creates runs for each participant agent
  2. Coordination rounds:
    • Poll messages from agent mailboxes
    • Run coordinator with collected context
    • Coordinator decides: send messages, spawn more runs, or stop
  3. Termination: Stop when coordinator signals done or policy limits are reached

The scheduler emits structured run events (scheduler_started, scheduler_agent_started, etc.) for full observability. See Scheduler Events for details.

Precompiled Workflows

For workflows that run repeatedly with different inputs, you can compile once and reuse the plan:

// Compile once
const { plan_hash } = await mr.workflows.compile(spec);

// Run multiple times
const run = await mr.runs.createFromPlan(plan_hash, {
  input: { task: "Process this" },
});
// Compile once
compiled, _ := client.Workflows().Compile(ctx, spec)

// Run multiple times
run, _ := client.Runs.CreateFromPlan(ctx, compiled.PlanHash,
    sdk.WithRunInputs(map[string]any{"task": "Process this"}),
)
// Compile once
let compiled = client.workflows().compile(spec).await?;

// Run multiple times
let run = client.runs().create_from_plan(compiled.plan_hash, options).await?;

See Precompiled Workflows in the Runs API reference for details.

Next Steps