Workflows
Workflows let you define multi-step AI pipelines as declarative specs. Instead of writing imperative code to orchestrate multiple LLM calls, you define a directed acyclic graph (DAG) of nodes and let ModelRelay handle execution, parallelism, and state management.
ModelRelay’s workflow format uses concise {{placeholders}} for data binding between nodes.
Why Workflows?
flowchart LR
A[Input] --> B[Node 1]
A --> C[Node 2]
B --> D[Node 3]
C --> D
D --> E[Output]
Without workflows: You write imperative code to call multiple models, manage intermediate state, handle errors, and coordinate parallel execution.
With workflows: You declare what you want (nodes, dependencies, outputs) and the runtime handles how to execute it efficiently.
Benefits:
- Parallel execution: Independent nodes run concurrently.
- Retry handling: The runtime can retry failed steps where appropriate.
- Event streaming: Monitor progress in real-time via run events.
- Idempotent runs: Repeat requests can return cached results.
- Cost tracking: Aggregated usage across all nodes.
Quick Start
Sequential Chain
Chain steps together with automatic edge wiring:
import { ModelRelay, chain, llm } from "@modelrelay/sdk";
const mr = ModelRelay.fromSecretKey(process.env.MODELRELAY_API_KEY!);
const spec = chain([
llm("summarize", (n) => n.system("Summarize.").user("{{task}}")),
llm("translate", (n) => n.system("Translate to French.").user("{{summarize}}")),
], { name: "summarize-translate", model: "claude-sonnet-4-5" })
.output("result", "translate")
.build();
const { run_id } = await mr.runs.create(spec, {
input: { task: "Explain how photosynthesis works" },
});
spec, _ := sdk.Chain([]sdk.WorkflowIntentNode{
sdk.LLM("summarize", func(n sdk.LLMNodeBuilder) sdk.LLMNodeBuilder {
return n.System("Summarize.").User("{{task}}")
}),
sdk.LLM("translate", func(n sdk.LLMNodeBuilder) sdk.LLMNodeBuilder {
return n.System("Translate to French.").User("{{summarize}}")
}),
}, sdk.ChainOptions{Name: "summarize-translate", Model: "claude-sonnet-4-5"}).
Output("result", "translate").
Build()
created, _ := client.Runs.Create(ctx, spec, sdk.WithRunInputs(map[string]any{
"task": "Explain how photosynthesis works",
}))
use modelrelay::{chain, llm, ChainOptions, RunsCreateOptions};
use serde_json::json;
let spec = chain(
vec![
llm("summarize", |n| n.system("Summarize.").user("{{task}}")),
llm("translate", |n| n.system("Translate to French.").user("{{summarize}}")),
],
ChainOptions { name: Some("summarize-translate".into()), model: Some("claude-sonnet-4-5".into()), ..Default::default() },
)
.output("result", "translate", None)
.build()?;
let run = client.runs().create(spec, RunsCreateOptions {
input: Some(json!({ "task": "Explain how photosynthesis works" })),
..Default::default()
}).await?;
Parallel with Aggregation
Fan out to multiple agents, then aggregate their outputs:
import { parallel, llm } from "@modelrelay/sdk";
const spec = parallel([
llm("agent_a", (n) => n.user("Write 3 ideas for {{task}}.")),
llm("agent_b", (n) => n.user("Write 3 objections for {{task}}.")),
], { name: "multi-agent", model: "claude-sonnet-4-5" })
.llm("aggregate", (n) => n.system("Synthesize.").user("{{join}}"))
.edge("join", "aggregate")
.output("result", "aggregate")
.build();
spec, _ := sdk.Parallel([]sdk.WorkflowIntentNode{
sdk.LLM("agent_a", func(n sdk.LLMNodeBuilder) sdk.LLMNodeBuilder {
return n.User("Write 3 ideas for {{task}}.")
}),
sdk.LLM("agent_b", func(n sdk.LLMNodeBuilder) sdk.LLMNodeBuilder {
return n.User("Write 3 objections for {{task}}.")
}),
}, sdk.ParallelOptions{Name: "multi-agent", Model: "claude-sonnet-4-5"}).
LLM("aggregate", func(n sdk.LLMNodeBuilder) sdk.LLMNodeBuilder {
return n.System("Synthesize.").User("{{join}}")
}).
Edge("join", "aggregate").
Output("result", "aggregate").
Build()
use modelrelay::{parallel, llm, ParallelOptions};
let spec = parallel(
vec![
llm("agent_a", |n| n.user("Write 3 ideas for {{task}}.")),
llm("agent_b", |n| n.user("Write 3 objections for {{task}}.")),
],
ParallelOptions { name: Some("multi-agent".into()), model: Some("claude-sonnet-4-5".into()), ..Default::default() },
)
.llm("aggregate", |n| n.system("Synthesize.").user("{{join}}"))
.edge("join", "aggregate")
.output("result", "aggregate", None)
.build()?;
Workflow Spec
Top-Level Fields
kind: must beworkflowname: optional labelmodel: optional default model for all LLM nodesmax_parallelism: optional max concurrent node executions (default: 4)nodes: list of nodesoutputs: named outputs returned by the run
Node Types
workflow supports:
llmjoin.alljoin.anyjoin.collecttransform.jsonmap.fanoutagent.runagent.scheduler
Placeholders
Use placeholders inside user (and system) strings:
{{task}}binds to the run input namedtask.{{node_id}}or{{node_id.output}}binds to another node’s output.{{item}}is available insidemap.fanoutsubnodes.
Placeholders create implicit dependencies. You can also set explicit dependencies with depends_on on any node.
Map Fan-Out Example
{
"kind": "workflow",
"name": "fanout",
"model": "gpt-5.1",
"nodes": [
{ "id": "generator", "type": "llm", "user": "Generate 3 subquestions for {{task}}" },
{
"id": "fanout",
"type": "map.fanout",
"items_from": "generator",
"items_path": "/questions",
"subnode": { "id": "answer", "type": "llm", "user": "Answer: {{item}}" },
"max_parallelism": 4
},
{ "id": "aggregate", "type": "llm", "user": "Combine: {{fanout.output}}" }
],
"outputs": [{ "name": "final", "from": "aggregate" }]
}
Join & Aggregation Example
{
"kind": "workflow",
"name": "multi-agent",
"model": "claude-sonnet-4-5",
"nodes": [
{ "id": "agent_a", "type": "llm", "user": "Write 3 ideas for {{task}}." },
{ "id": "agent_b", "type": "llm", "user": "Write 3 objections for {{task}}." },
{ "id": "join", "type": "join.all" },
{ "id": "aggregate", "type": "llm", "user": "Synthesize: {{join}}" }
],
"outputs": [{ "name": "result", "from": "aggregate" }]
}
Agent Scheduler
The agent.scheduler node enables multi-agent coordination loops within a workflow. While standard workflow nodes execute in a fixed DAG pattern, the scheduler can run multiple rounds of agent interactions with message-based coordination.
flowchart LR
WF[Workflow DAG] --> SCH[agent.scheduler]
SCH -->|spawn| A1[Agent 1]
SCH -->|spawn| A2[Agent 2]
A1 -->|messages| SCH
A2 -->|messages| SCH
SCH -->|coordinate| C[Coordinator]
C -->|decision| SCH
When to Use
Use agent.scheduler when you need:
- Multi-round coordination: Agents iterate until consensus or completion
- Dynamic message routing: Coordinator decides who to involve based on context
- Bounded loops: Policy constraints prevent runaway execution
For fixed pipelines (A→B→C), use standard workflow nodes instead.
Scheduler Spec
{
"id": "review_loop",
"type": "agent.scheduler",
"agents": [
{"agent": "coder@1", "count": 2, "input": [{"role": "user", "content": "Implement feature X"}]},
{"agent": "reviewer@1", "count": 1, "input": [{"role": "user", "content": "Review implementations"}]}
],
"coordinator": {
"agent": "lead@1",
"input": [{"role": "user", "content": "Coordinate the review process"}]
},
"policy": {
"max_rounds": 5,
"max_concurrency": 3,
"max_steps": 50,
"timeout_ms": 120000
}
}
Scheduler Fields
| Field | Type | Description |
|---|---|---|
agents |
array | Participant agents to spawn (agent slug, count, input) |
coordinator |
object | Optional coordinator agent that makes routing decisions |
policy.max_rounds |
int | Maximum coordination rounds |
policy.max_concurrency |
int | Maximum concurrent agent runs |
policy.max_steps |
int | Total step limit across all agents |
policy.timeout_ms |
int | Overall timeout in milliseconds |
How It Works
- Spawn phase: Scheduler creates runs for each participant agent
- Coordination rounds:
- Poll messages from agent mailboxes
- Run coordinator with collected context
- Coordinator decides: send messages, spawn more runs, or stop
- Termination: Stop when coordinator signals done or policy limits are reached
The scheduler emits structured run events (scheduler_started, scheduler_agent_started, etc.) for full observability. See Scheduler Events for details.
Precompiled Workflows
For workflows that run repeatedly with different inputs, you can compile once and reuse the plan:
// Compile once
const { plan_hash } = await mr.workflows.compile(spec);
// Run multiple times
const run = await mr.runs.createFromPlan(plan_hash, {
input: { task: "Process this" },
});
// Compile once
compiled, _ := client.Workflows().Compile(ctx, spec)
// Run multiple times
run, _ := client.Runs.CreateFromPlan(ctx, compiled.PlanHash,
sdk.WithRunInputs(map[string]any{"task": "Process this"}),
)
// Compile once
let compiled = client.workflows().compile(spec).await?;
// Run multiple times
let run = client.runs().create_from_plan(compiled.plan_hash, options).await?;
See Precompiled Workflows in the Runs API reference for details.
Next Steps
- Runs API - Execute workflows and stream events
- Plugins - Compile Claude Code plugins to workflows
- Workflow Spec Reference -
workflowspec details