Cersei

Workflows API

API reference for cersei-workflows — the Step trait, the WorkflowDef IR, conditions, the registry, the builder, execution, events, and results.

Workflows API

Everything in cersei-workflows, grouped by module. Import the common types from the prelude:

use cersei::workflows::prelude::*;
// or: use cersei_workflows::prelude::*;

Step — the unit of work

A step is the workflow analogue of a Tool: an id, input/output JSON schemas, and an async execute.

#[async_trait]
pub trait Step: Send + Sync {
    fn id(&self) -> &str;
    fn description(&self) -> &str { "" }
    fn input_schema(&self) -> serde_json::Value { Value::Null }
    fn output_schema(&self) -> serde_json::Value { Value::Null }
    async fn execute(&self, input: Value, ctx: &StepContext) -> Result<StepOutcome>;
}

StepOutcome

pub enum StepOutcome {
    Done(Value),                                        // produced output, step complete
    Suspended { resume_schema: Value, payload: Value }, // pause until resumed
}

StepOutcome::done(json!({ "ok": true })); // convenience for Done

Returning Suspended halts the run with status Suspended; the run is resumed later with Workflow::resume. See suspend/resume.

StepContext

Passed by reference to every execute, like ToolContext.

pub struct StepContext {
    pub run_id: String,
    pub state: Arc<Mutex<Value>>,        // shared, mutable workflow state
    pub events: mpsc::Sender<WorkflowEvent>,
    pub extensions: Extensions,          // cersei_tools type-map for runtime injection
    pub resume_data: Option<Value>,      // Some(_) only on a resumed step
}

impl StepContext {
    pub fn state(&self) -> Value;          // clone the current shared state
    pub fn set_state(&self, value: Value); // replace it and emit StateUpdated
}

state is a synchronous lock. Read or write it and drop the guard — never hold it across an .await.

StepRun (typed)

The typed counterpart, mirroring ToolExecute. It is the intended target of a future #[derive(Step)] macro.

#[async_trait]
pub trait StepRun: Send + Sync {
    type Input:  DeserializeOwned + JsonSchema + Send;
    type Output: Serialize + JsonSchema + Send;
    fn id(&self) -> &str;
    async fn run(&self, input: Self::Input, ctx: &StepContext) -> Result<Self::Output>;
}

First-party steps

TypeWrapsOutput shape
FnStepan async closurewhatever the closure returns
AgentStepa cersei_agent::Agent{ text, turns, stop_reason }
ToolStepany cersei_tools::Tool{ content, is_error, metadata }
WorkflowStepa nested Workflowthe nested workflow's final result

FnStep

// Output a plain Value:
FnStep::new("upper", |input: Value, ctx: StepContext| async move {
    Ok(json!({ "message": /* … */ }))
})
.description("uppercases message")
.input_schema(json!({ "type": "object" }))
.output_schema(json!({ "type": "object" }));

// Output a full StepOutcome (needed to suspend):
FnStep::with_outcome("gate", |input, ctx| async move {
    match ctx.resume_data {
        Some(data) => Ok(StepOutcome::Done(json!({ "approved": data }))),
        None => Ok(StepOutcome::Suspended {
            resume_schema: json!({ "type": "boolean" }),
            payload: input,
        }),
    }
});

AgentStep

Renders the node input into a prompt ({{input}} for the whole input as a string, {{field}} for a top-level field), runs the agent to completion, and returns { text, turns, stop_reason }.

let agent = Arc::new(Agent::builder().provider(/* … */).build()?);
AgentStep::new("triage", agent, "Classify this ticket:\n{{body}}");

ToolStep

Reuses the tool's own input_schema, dispatches the node input straight into Tool::execute, and surfaces a tool error as a failed step. The workflow's extensions flow into the constructed ToolContext.

ToolStep::new("read", Arc::new(my_tool)).working_dir("/srv/project");

WorkflowStep

Runs a compiled nested workflow; a nested suspend propagates upward.

WorkflowStep::new("subflow", Arc::new(child_workflow));

The IR — WorkflowDef

The serializable graph. Identical whether produced by the builder or the UI.

pub struct WorkflowDef {
    pub id: String,
    pub input_schema: Value,
    pub output_schema: Value,
    pub nodes: Vec<WorkflowNode>,
    pub edges: Vec<WorkflowEdge>,
    pub entry: NodeId,           // where execution begins
}
pub type NodeId = String;        // string ids so xyflow owns them

pub struct WorkflowNode { pub id: NodeId, pub kind: NodeKind, pub ui: Option<UiHints> }
pub struct WorkflowEdge { pub from: NodeId, pub to: NodeId, pub kind: EdgeKind }

NodeKind

pub enum NodeKind {
    Step { step_id: String, config: Value }, // run a registered step (config merged into input)
    Map { mapping: MapSpec },                 // pure JSON reshape
    Parallel,                                 // fan-out marker
    Join { strategy: JoinStrategy },          // wait for parallel branches
    Branch,                                   // first matching `When` arm wins
    Loop { mode: LoopMode, body: NodeId, condition: Option<Condition> },
}

pub enum LoopMode { DoWhile, DoUntil, ForEach { concurrency: usize } }
pub enum JoinStrategy { AllOrFail, AllSettled }

EdgeKind

pub enum EdgeKind {
    Then,                          // sequential continuation
    Fork,                          // Parallel node -> a branch
    Merge,                         // a branch -> Join node
    When { condition: Condition }, // a Branch arm
    LoopBack,                      // a loop body's tail -> its Loop node (the only legal cycle)
}

MapSpec and UiHints

pub struct MapSpec { pub fields: HashMap<String, String> } // out_field -> JSON-pointer into scope
pub struct UiHints { pub x, y: Option<f64>, pub label, color: Option<String> }

How the IR maps to xyflow

Builder / MastraIRxyflow
.then(step)Step node + Then edgedefault node + default edge
.parallel([...])ParallelFork edges → JoinMerge edgesone source → N targets, join node
.branch([[cond, step]])Branch + When edges (labeled by predicate)node with labeled edges
.dowhile / .dountilLoop + LoopBacknode with an animated back edge
.foreachLoop { ForEach }node badged "foreach"
.map(fn)Map node"transform" node

Condition

Serializable branch predicates over the run scope { input, state, steps: { <node_id>: <output> }, current }. Paths are JSON Pointers (/current/premium, /steps/score_1/value); a leading / is optional.

pub enum Condition {
    Always,
    Eq { path: String, value: Value },
    Ne { path: String, value: Value },
    Gt { path: String, value: f64 },
    Lt { path: String, value: f64 },
    Exists { path: String },
    Truthy { path: String },
    And(Vec<Condition>),
    Or(Vec<Condition>),
    Not(Box<Condition>),
}

impl Condition { pub fn eval(&self, scope: &Value) -> bool; }
Condition::And(vec![
    Condition::Truthy { path: "current/active".into() },
    Condition::Gt { path: "steps/score_1/value".into(), value: 0.8 },
]);

StepRegistry

Host-side map of step-id → Arc<dyn Step>. UI JSON references steps by id only; the registry supplies the implementations, and Workflow::compile resolves them.

let registry = StepRegistry::new(); // Arc<StepRegistry>
registry.register(Arc::new(my_step));

registry.get("upper");      // Option<Arc<dyn Step>>
registry.ids();             // Vec<String>
registry.catalog();         // Vec<StepInfo> — the UI builder palette

// Builder-style chaining:
let registry = StepRegistry::new()
    .with(Arc::new(step_a))
    .with(Arc::new(step_b));
pub struct StepInfo { pub id, description: String, pub input_schema, output_schema: Value }

WorkflowBuilder

The programmatic front-end. Emits the same WorkflowDef the UI does; it auto-generates node ids and synthesizes Parallel/Join/Branch marker nodes.

let def = WorkflowBuilder::new("my-flow")
    .input_schema(json!({ "type": "object" }))
    .output_schema(json!({ "type": "object" }))
    .then("step_a")                          // Step + Then
    .then_with("step_b", json!({ "cfg": 1 })) // Step carrying static config
    .map(MapSpec { fields })                  // Map reshape
    .parallel(&["fetch_x", "fetch_y"], JoinStrategy::AllOrFail)
    .branch(vec![
        (Condition::Truthy { path: "current/urgent".into() }, "escalate"),
        (Condition::Always, "normal"),
    ])
    .dowhile("poll", Condition::Truthy { path: "current/pending".into() })
    .dountil("retry", Condition::Truthy { path: "current/ok".into() })
    .commit();                                // -> WorkflowDef

Workflow

A validated, executable workflow. compile resolves every Step node against the registry and checks structural sanity (single entry, branch arms are When, loop bodies exist, no stray cycles) before any execution.

pub fn compile(def: WorkflowDef, registry: &StepRegistry) -> Result<Arc<Workflow>>;

pub async fn start(&self, input: Value) -> Result<WorkflowResult>;
pub fn stream(self: &Arc<Self>, input: Value) -> WorkflowStream;
pub async fn resume(&self, run_id: &str, node_id: &NodeId, data: Value) -> Result<WorkflowResult>;

pub fn def(&self) -> &WorkflowDef;

Compilation rejects an unknown step id with CerseiError::Tool("unknown step: '…'") and a malformed graph with CerseiError::Config(…).


Events — WorkflowEvent / WorkflowStream

Every variant is Serialize (externally tagged, snake_case), so it can be forwarded to a browser to drive the live graph.

pub enum WorkflowEvent {
    WorkflowStarted { run_id },
    StepStarted { node_id, step_id, input },
    StepCompleted { node_id, output, duration_ms },
    StepFailed { node_id, error },
    StepSuspended { node_id, resume_schema },
    BranchTaken { node_id, edge_to },
    StateUpdated { state },
    LoopIteration { node_id, iteration },
    Inner { node_id, inner },                 // bridged nested agent/workflow activity
    WorkflowCompleted { result },             // carries the terminal WorkflowResult
    Error(String),
}
pub struct WorkflowStream { /* … */ }
impl WorkflowStream {
    pub async fn next(&mut self) -> Option<WorkflowEvent>;
    pub fn cancel(&self);
    pub fn resume(&self, node_id: NodeId, data: Value);
    pub async fn collect(self) -> Option<WorkflowResult>; // drain, return the terminal result
}

Results — WorkflowResult / RunStatus

Shaped to match Mastra's JSON output so a UI can render it directly.

pub enum RunStatus { Running, Success, Failed, Suspended, Paused } // serde: lowercase

pub struct WorkflowResult {
    pub run_id: String,
    pub status: RunStatus,
    pub input: Value,
    pub steps: HashMap<NodeId, StepResult>,
    pub result: Option<Value>,   // final output on Success
    pub state: Value,            // terminal shared state
    pub error: Option<String>,
    pub suspended: Vec<SuspendPoint>,
}

pub struct StepResult {
    pub status: RunStatus,
    pub output: Option<Value>,
    pub error: Option<String>,
    pub started_at: i64,
    pub ended_at: Option<i64>,
}

pub struct SuspendPoint { pub node_id: NodeId, pub resume_schema: Value, pub payload: Value }

Persistence — RunStore

Backs suspend/resume. The MVP is an in-memory store keyed by run_id; the same API is intended to gain a durable JSONL / cersei-memory backend so suspended runs survive a restart.

pub struct RunSnapshot { pub run_id, pub input, pub state: Value, pub outputs: HashMap<NodeId, Value> }

let store = RunStore::in_memory();
store.save(snapshot);
store.load(&run_id); // Option<RunSnapshot>

A compiled Workflow owns its store; start() saves a snapshot when a step suspends, and resume() reloads it, replays the already-computed nodes from the snapshot, and continues from the suspended node with the supplied resume_data.

On this page