Cersei

Workflows Cookbook

Complete, runnable cersei-workflows recipes — sequential pipelines, parallel fan-out, branching, shared state, agent and tool steps, nested workflows, suspend/resume, and driving it all from a UI.

Workflows Cookbook

Practical recipes built on cersei-workflows. Each is self-contained. They assume:

[dependencies]
cersei = { version = "0.2.1", features = ["workflows"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"
use cersei::workflows::prelude::*;
use serde_json::json;
use std::sync::Arc;

Sequential pipeline

The "hello world" — three steps, each receiving the previous step's output.

#[tokio::main]
async fn main() -> cersei::types::Result<()> {
    let registry = StepRegistry::new();
    registry.register(Arc::new(FnStep::new("upper", |input, _ctx| async move {
        let s = input.get("message").and_then(|v| v.as_str()).unwrap_or("");
        Ok(json!({ "message": s.to_uppercase() }))
    })));
    registry.register(Arc::new(FnStep::new("emphasize", |input, _ctx| async move {
        let s = input.get("message").and_then(|v| v.as_str()).unwrap_or("");
        Ok(json!({ "message": format!("{s}!!!") }))
    })));

    let def = WorkflowBuilder::new("greet")
        .then("upper")
        .then("emphasize")
        .commit();

    let wf = Workflow::compile(def, &registry)?;
    let out = wf.start(json!({ "message": "hello world" })).await?;

    assert_eq!(out.status, RunStatus::Success);
    assert_eq!(out.result.unwrap(), json!({ "message": "HELLO WORLD!!!" }));
    // out.steps is a map of node_id -> StepResult with per-step timing.
    Ok(())
}

Parallel fan-out and join

Run independent steps concurrently and collect their outputs into an array. AllOrFail aborts on the first error; AllSettled collects every result, substituting null for a failed branch.

let registry = StepRegistry::new();
registry.register(Arc::new(FnStep::new("weather", |_in, _ctx| async move {
    Ok(json!({ "temp_c": 21 }))
})));
registry.register(Arc::new(FnStep::new("news", |_in, _ctx| async move {
    Ok(json!({ "headline": "Rust 2.0 announced" }))
})));

let def = WorkflowBuilder::new("dashboard")
    .parallel(&["weather", "news"], JoinStrategy::AllOrFail)
    .commit();

let wf = Workflow::compile(def, &registry)?;
let out = wf.start(json!({})).await?;

// The join produces an array of branch outputs.
// out.result == [ { "temp_c": 21 }, { "headline": "Rust 2.0 announced" } ]

A common pattern is a Map step after the join to reshape the array into a named object — see Reshaping data.


Conditional branching

branch evaluates its arms in order against the run scope; the first matching Condition wins, and the other arms never run. Condition::Always is the catch-all "else".

let registry = StepRegistry::new();
registry.register(Arc::new(FnStep::new("enrich_paid", |_in, _ctx| async move {
    Ok(json!({ "tier": "paid", "limits": 10_000 }))
})));
registry.register(Arc::new(FnStep::new("enrich_free", |_in, _ctx| async move {
    Ok(json!({ "tier": "free", "limits": 50 }))
})));

let def = WorkflowBuilder::new("entitlements")
    .branch(vec![
        (Condition::Eq { path: "current/premium".into(), value: json!(true) }, "enrich_paid"),
        (Condition::Always, "enrich_free"),
    ])
    .commit();

let wf = Workflow::compile(def, &registry)?;
let out = wf.start(json!({ "premium": true })).await?;
// Only "enrich_paid" ran. out.steps contains no "enrich_free*" entry.

path is a JSON Pointer over { input, state, steps, current }, where current is the input arriving at the branch. Combine predicates with And / Or / Not:

Condition::And(vec![
    Condition::Truthy { path: "current/active".into() },
    Condition::Gt { path: "current/score".into(), value: 0.8 },
]);

Sharing state across steps

StepContext carries a shared, mutable state (Mastra's setState/getState) for values you don't want to thread through every step's input. Writing it emits a StateUpdated event, and it lands in WorkflowResult.state.

let registry = StepRegistry::new();
registry.register(Arc::new(FnStep::new("tick", |_in, ctx| async move {
    let mut s = ctx.state();
    let n = s.get("count").and_then(|v| v.as_i64()).unwrap_or(0) + 1;
    s = json!({ "count": n });
    ctx.set_state(s);            // visible to later steps and to the UI
    Ok(json!({ "n": n }))
})));

let def = WorkflowBuilder::new("counter").then("tick").then("tick").commit();
let wf = Workflow::compile(def, &registry)?;
let out = wf.start(json!({})).await?;
// out.state == { "count": 2 }

ctx.state() clones the state; ctx.set_state(v) replaces it. The underlying lock is synchronous — never hold it across an .await.


Reshaping data with Map

A Map node is a pure JSON transform (no registry lookup). Each output field is filled from a JSON-pointer path into the scope — handy right after a parallel Join.

use std::collections::HashMap;

let mut fields = HashMap::new();
fields.insert("temperature".to_string(), "/current/0/temp_c".to_string());
fields.insert("top_story".to_string(),  "/current/1/headline".to_string());

let def = WorkflowBuilder::new("dashboard")
    .parallel(&["weather", "news"], JoinStrategy::AllOrFail)
    .map(MapSpec { fields })
    .commit();
// final result == { "temperature": 21, "top_story": "Rust 2.0 announced" }

A step that runs an agent

AgentStep wraps a cersei_agent::Agent. The node input is rendered into the prompt template ({{field}} per field, {{input}} for the whole input), and the step returns { text, turns, stop_reason }.

use cersei::prelude::*;

let agent = Arc::new(
    Agent::builder()
        .provider(Anthropic::from_env()?)
        .model("claude-sonnet-4-6")
        .build()?,
);

let registry = StepRegistry::new();
registry.register(Arc::new(AgentStep::new(
    "classify",
    agent,
    "Classify the sentiment of this review as positive/negative/neutral:\n{{review}}",
)));

let def = WorkflowBuilder::new("review-triage").then("classify").commit();
let wf = Workflow::compile(def, &registry)?;
let out = wf.start(json!({ "review": "Best purchase of the year." })).await?;
// out.result == { "text": "positive", "turns": 1, "stop_reason": "EndTurn" }

You can mix agent and function steps freely — e.g. .then("classify").then("route") where route is a FnStep that branches on the agent's text.


A step that runs a tool

ToolStep wraps any Tool, reusing its input_schema and dispatching the node input straight into Tool::execute. The output is { content, is_error, metadata }; a tool error becomes a failed step.

use cersei::tools::Tool;

let registry = StepRegistry::new();
registry.register(Arc::new(
    ToolStep::new("read_file", Arc::new(my_read_tool)).working_dir("/srv/project"),
));

let def = WorkflowBuilder::new("inspect").then("read_file").commit();
let wf = Workflow::compile(def, &registry)?;
let out = wf.start(json!({ "path": "Cargo.toml" })).await?;
// out.result["content"] holds the tool output.

Workflows as steps (nesting)

Compile a child workflow and embed it with WorkflowStep. The child runs to completion and contributes its final result; a child suspend propagates up to the parent.

// Child: normalize -> validate
let child_def = WorkflowBuilder::new("ingest").then("normalize").then("validate").commit();
let child = Workflow::compile(child_def, &registry)?;

// Parent uses the child as a single step.
registry.register(Arc::new(WorkflowStep::new("ingest", child)));
let parent_def = WorkflowBuilder::new("pipeline").then("ingest").then("index").commit();
let parent = Workflow::compile(parent_def, &registry)?;
let out = parent.start(json!({ "raw": "…" })).await?;

Human-in-the-loop (suspend / resume)

A step returns StepOutcome::Suspended to pause the run — e.g. waiting on an approval. The run comes back with status Suspended and a SuspendPoint. Later, Workflow::resume replays the completed steps from the snapshot and continues from the suspended node, injecting resume_data.

let registry = StepRegistry::new();
registry.register(Arc::new(FnStep::with_outcome("approval", |input, ctx| async move {
    match ctx.resume_data {
        Some(decision) => Ok(StepOutcome::Done(json!({ "approved": decision }))),
        None => Ok(StepOutcome::Suspended {
            resume_schema: json!({ "type": "boolean" }),
            payload: input, // surfaced to the caller (e.g. render an approve/reject form)
        }),
    }
})));
registry.register(Arc::new(FnStep::new("publish", |input, _ctx| async move {
    Ok(json!({ "published": input.get("approved") == Some(&json!(true)) }))
})));

let def = WorkflowBuilder::new("publish-flow").then("approval").then("publish").commit();
let wf = Workflow::compile(def, &registry)?;

// 1. First run suspends at the approval gate.
let suspended = wf.start(json!({ "doc": "release-notes" })).await?;
assert_eq!(suspended.status, RunStatus::Suspended);
let node = suspended.suspended[0].node_id.clone();

// 2. A human approves; resume continues to completion.
let done = wf.resume(&suspended.run_id, &node, json!(true)).await?;
assert_eq!(done.status, RunStatus::Success);
// done.result == { "published": true }

The MVP RunStore is in-memory, so resume must happen in the same process. The same API is designed to gain a durable backend so suspended runs survive restarts.


Streaming live status to a UI

stream() returns serializable events — forward them to a browser to animate the graph in real time.

let wf = Workflow::compile(def, &registry)?; // Arc<Workflow>
let mut stream = wf.stream(json!({ "message": "hi" }));

while let Some(event) = stream.next().await {
    match &event {
        WorkflowEvent::StepStarted { node_id, .. }   => println!("▶ {node_id}"),
        WorkflowEvent::StepCompleted { node_id, .. } => println!("✓ {node_id}"),
        WorkflowEvent::WorkflowCompleted { result }  => println!("done: {:?}", result.status),
        _ => {}
    }
    // Every event is Serialize — push it straight to the client:
    let _frame = serde_json::to_string(&event)?;
    // ws.send(_frame).await?;
}

Driving a workflow from the visual builder

The visual builder (React + xyflow) is just another producer of WorkflowDef JSON. The host receives JSON, deserializes it, compiles it against the steps it has registered, and runs it. Nothing about execution differs from a builder-authored workflow.

// Received from the UI as JSON:
let body = r#"{
  "id": "greet",
  "input_schema": null,
  "output_schema": null,
  "entry": "upper_1",
  "nodes": [
    { "id": "upper_1",     "kind": { "step": { "step_id": "upper",     "config": null } } },
    { "id": "emphasize_2", "kind": { "step": { "step_id": "emphasize", "config": null } } }
  ],
  "edges": [ { "from": "upper_1", "to": "emphasize_2", "kind": "then" } ]
}"#;

let def: WorkflowDef = serde_json::from_str(body)?;
let wf = Workflow::compile(def, &registry)?; // fails loudly if a step_id isn't registered
let out = wf.start(json!({ "message": "hi" })).await?;

Expose the available steps to the builder palette with registry.catalog() — each StepInfo carries the id, description, and input/output schemas the UI needs to render a node and validate wiring.

let palette = registry.catalog();
let palette_json = serde_json::to_string(&palette)?; // -> the builder's node menu

Validation is at compile time, not run time. Workflow::compile rejects an unknown step_id (unknown step: '…') and a malformed graph (missing entry, a branch with no arms, a loop body that doesn't exist) before any step executes — so a bad drawing from the UI fails fast with a clear message.

On this page