Workflows Cookbook
Complete, runnable cersei-workflows recipes — sequential pipelines, parallel fan-out, branching, shared state, agent and tool steps, nested workflows, suspend/resume, and driving it all from a UI.
Workflows Cookbook
Practical recipes built on cersei-workflows. Each is self-contained. They assume:
[dependencies]
cersei = { version = "0.2.1", features = ["workflows"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"use cersei::workflows::prelude::*;
use serde_json::json;
use std::sync::Arc;Sequential pipeline
The "hello world" — three steps, each receiving the previous step's output.
#[tokio::main]
async fn main() -> cersei::types::Result<()> {
let registry = StepRegistry::new();
registry.register(Arc::new(FnStep::new("upper", |input, _ctx| async move {
let s = input.get("message").and_then(|v| v.as_str()).unwrap_or("");
Ok(json!({ "message": s.to_uppercase() }))
})));
registry.register(Arc::new(FnStep::new("emphasize", |input, _ctx| async move {
let s = input.get("message").and_then(|v| v.as_str()).unwrap_or("");
Ok(json!({ "message": format!("{s}!!!") }))
})));
let def = WorkflowBuilder::new("greet")
.then("upper")
.then("emphasize")
.commit();
let wf = Workflow::compile(def, ®istry)?;
let out = wf.start(json!({ "message": "hello world" })).await?;
assert_eq!(out.status, RunStatus::Success);
assert_eq!(out.result.unwrap(), json!({ "message": "HELLO WORLD!!!" }));
// out.steps is a map of node_id -> StepResult with per-step timing.
Ok(())
}Parallel fan-out and join
Run independent steps concurrently and collect their outputs into an array. AllOrFail aborts on the first error; AllSettled collects every result, substituting null for a failed branch.
let registry = StepRegistry::new();
registry.register(Arc::new(FnStep::new("weather", |_in, _ctx| async move {
Ok(json!({ "temp_c": 21 }))
})));
registry.register(Arc::new(FnStep::new("news", |_in, _ctx| async move {
Ok(json!({ "headline": "Rust 2.0 announced" }))
})));
let def = WorkflowBuilder::new("dashboard")
.parallel(&["weather", "news"], JoinStrategy::AllOrFail)
.commit();
let wf = Workflow::compile(def, ®istry)?;
let out = wf.start(json!({})).await?;
// The join produces an array of branch outputs.
// out.result == [ { "temp_c": 21 }, { "headline": "Rust 2.0 announced" } ]A common pattern is a Map step after the join to reshape the array into a named object — see Reshaping data.
Conditional branching
branch evaluates its arms in order against the run scope; the first matching Condition wins, and the other arms never run. Condition::Always is the catch-all "else".
let registry = StepRegistry::new();
registry.register(Arc::new(FnStep::new("enrich_paid", |_in, _ctx| async move {
Ok(json!({ "tier": "paid", "limits": 10_000 }))
})));
registry.register(Arc::new(FnStep::new("enrich_free", |_in, _ctx| async move {
Ok(json!({ "tier": "free", "limits": 50 }))
})));
let def = WorkflowBuilder::new("entitlements")
.branch(vec![
(Condition::Eq { path: "current/premium".into(), value: json!(true) }, "enrich_paid"),
(Condition::Always, "enrich_free"),
])
.commit();
let wf = Workflow::compile(def, ®istry)?;
let out = wf.start(json!({ "premium": true })).await?;
// Only "enrich_paid" ran. out.steps contains no "enrich_free*" entry.path is a JSON Pointer over { input, state, steps, current }, where current is the input arriving at the branch. Combine predicates with And / Or / Not:
Condition::And(vec![
Condition::Truthy { path: "current/active".into() },
Condition::Gt { path: "current/score".into(), value: 0.8 },
]);Sharing state across steps
StepContext carries a shared, mutable state (Mastra's setState/getState) for values you don't want to thread through every step's input. Writing it emits a StateUpdated event, and it lands in WorkflowResult.state.
let registry = StepRegistry::new();
registry.register(Arc::new(FnStep::new("tick", |_in, ctx| async move {
let mut s = ctx.state();
let n = s.get("count").and_then(|v| v.as_i64()).unwrap_or(0) + 1;
s = json!({ "count": n });
ctx.set_state(s); // visible to later steps and to the UI
Ok(json!({ "n": n }))
})));
let def = WorkflowBuilder::new("counter").then("tick").then("tick").commit();
let wf = Workflow::compile(def, ®istry)?;
let out = wf.start(json!({})).await?;
// out.state == { "count": 2 }ctx.state() clones the state; ctx.set_state(v) replaces it. The underlying lock is synchronous — never hold it across an .await.
Reshaping data with Map
A Map node is a pure JSON transform (no registry lookup). Each output field is filled from a JSON-pointer path into the scope — handy right after a parallel Join.
use std::collections::HashMap;
let mut fields = HashMap::new();
fields.insert("temperature".to_string(), "/current/0/temp_c".to_string());
fields.insert("top_story".to_string(), "/current/1/headline".to_string());
let def = WorkflowBuilder::new("dashboard")
.parallel(&["weather", "news"], JoinStrategy::AllOrFail)
.map(MapSpec { fields })
.commit();
// final result == { "temperature": 21, "top_story": "Rust 2.0 announced" }A step that runs an agent
AgentStep wraps a cersei_agent::Agent. The node input is rendered into the prompt template ({{field}} per field, {{input}} for the whole input), and the step returns { text, turns, stop_reason }.
use cersei::prelude::*;
let agent = Arc::new(
Agent::builder()
.provider(Anthropic::from_env()?)
.model("claude-sonnet-4-6")
.build()?,
);
let registry = StepRegistry::new();
registry.register(Arc::new(AgentStep::new(
"classify",
agent,
"Classify the sentiment of this review as positive/negative/neutral:\n{{review}}",
)));
let def = WorkflowBuilder::new("review-triage").then("classify").commit();
let wf = Workflow::compile(def, ®istry)?;
let out = wf.start(json!({ "review": "Best purchase of the year." })).await?;
// out.result == { "text": "positive", "turns": 1, "stop_reason": "EndTurn" }You can mix agent and function steps freely — e.g. .then("classify").then("route") where route is a FnStep that branches on the agent's text.
A step that runs a tool
ToolStep wraps any Tool, reusing its input_schema and dispatching the node input straight into Tool::execute. The output is { content, is_error, metadata }; a tool error becomes a failed step.
use cersei::tools::Tool;
let registry = StepRegistry::new();
registry.register(Arc::new(
ToolStep::new("read_file", Arc::new(my_read_tool)).working_dir("/srv/project"),
));
let def = WorkflowBuilder::new("inspect").then("read_file").commit();
let wf = Workflow::compile(def, ®istry)?;
let out = wf.start(json!({ "path": "Cargo.toml" })).await?;
// out.result["content"] holds the tool output.Workflows as steps (nesting)
Compile a child workflow and embed it with WorkflowStep. The child runs to completion and contributes its final result; a child suspend propagates up to the parent.
// Child: normalize -> validate
let child_def = WorkflowBuilder::new("ingest").then("normalize").then("validate").commit();
let child = Workflow::compile(child_def, ®istry)?;
// Parent uses the child as a single step.
registry.register(Arc::new(WorkflowStep::new("ingest", child)));
let parent_def = WorkflowBuilder::new("pipeline").then("ingest").then("index").commit();
let parent = Workflow::compile(parent_def, ®istry)?;
let out = parent.start(json!({ "raw": "…" })).await?;Human-in-the-loop (suspend / resume)
A step returns StepOutcome::Suspended to pause the run — e.g. waiting on an approval. The run comes back with status Suspended and a SuspendPoint. Later, Workflow::resume replays the completed steps from the snapshot and continues from the suspended node, injecting resume_data.
let registry = StepRegistry::new();
registry.register(Arc::new(FnStep::with_outcome("approval", |input, ctx| async move {
match ctx.resume_data {
Some(decision) => Ok(StepOutcome::Done(json!({ "approved": decision }))),
None => Ok(StepOutcome::Suspended {
resume_schema: json!({ "type": "boolean" }),
payload: input, // surfaced to the caller (e.g. render an approve/reject form)
}),
}
})));
registry.register(Arc::new(FnStep::new("publish", |input, _ctx| async move {
Ok(json!({ "published": input.get("approved") == Some(&json!(true)) }))
})));
let def = WorkflowBuilder::new("publish-flow").then("approval").then("publish").commit();
let wf = Workflow::compile(def, ®istry)?;
// 1. First run suspends at the approval gate.
let suspended = wf.start(json!({ "doc": "release-notes" })).await?;
assert_eq!(suspended.status, RunStatus::Suspended);
let node = suspended.suspended[0].node_id.clone();
// 2. A human approves; resume continues to completion.
let done = wf.resume(&suspended.run_id, &node, json!(true)).await?;
assert_eq!(done.status, RunStatus::Success);
// done.result == { "published": true }The MVP RunStore is in-memory, so resume must happen in the same process. The same API is designed to gain a durable backend so suspended runs survive restarts.
Streaming live status to a UI
stream() returns serializable events — forward them to a browser to animate the graph in real time.
let wf = Workflow::compile(def, ®istry)?; // Arc<Workflow>
let mut stream = wf.stream(json!({ "message": "hi" }));
while let Some(event) = stream.next().await {
match &event {
WorkflowEvent::StepStarted { node_id, .. } => println!("▶ {node_id}"),
WorkflowEvent::StepCompleted { node_id, .. } => println!("✓ {node_id}"),
WorkflowEvent::WorkflowCompleted { result } => println!("done: {:?}", result.status),
_ => {}
}
// Every event is Serialize — push it straight to the client:
let _frame = serde_json::to_string(&event)?;
// ws.send(_frame).await?;
}Driving a workflow from the visual builder
The visual builder (React + xyflow) is just another producer of WorkflowDef JSON. The host receives JSON, deserializes it, compiles it against the steps it has registered, and runs it. Nothing about execution differs from a builder-authored workflow.
// Received from the UI as JSON:
let body = r#"{
"id": "greet",
"input_schema": null,
"output_schema": null,
"entry": "upper_1",
"nodes": [
{ "id": "upper_1", "kind": { "step": { "step_id": "upper", "config": null } } },
{ "id": "emphasize_2", "kind": { "step": { "step_id": "emphasize", "config": null } } }
],
"edges": [ { "from": "upper_1", "to": "emphasize_2", "kind": "then" } ]
}"#;
let def: WorkflowDef = serde_json::from_str(body)?;
let wf = Workflow::compile(def, ®istry)?; // fails loudly if a step_id isn't registered
let out = wf.start(json!({ "message": "hi" })).await?;Expose the available steps to the builder palette with registry.catalog() — each StepInfo carries the id, description, and input/output schemas the UI needs to render a node and validate wiring.
let palette = registry.catalog();
let palette_json = serde_json::to_string(&palette)?; // -> the builder's node menuValidation is at compile time, not run time. Workflow::compile rejects an unknown step_id (unknown step: '…') and a malformed graph (missing entry, a branch with no arms, a loop body that doesn't exist) before any step executes — so a bad drawing from the UI fails fast with a clear message.
Workflows API
API reference for cersei-workflows — the Step trait, the WorkflowDef IR, conditions, the registry, the builder, execution, events, and results.
Providers
13 built-in LLM providers — Anthropic, OpenAI, Google, Mistral, Groq, DeepSeek, xAI, Together, Fireworks, Perplexity, Cerebras, Ollama, OpenRouter.