Workflows API
API reference for cersei-workflows — the Step trait, the WorkflowDef IR, conditions, the registry, the builder, execution, events, and results.
Workflows API
Everything in cersei-workflows, grouped by module. Import the common types from the prelude:
use cersei::workflows::prelude::*;
// or: use cersei_workflows::prelude::*;Step — the unit of work
A step is the workflow analogue of a Tool: an id, input/output JSON schemas, and an async execute.
#[async_trait]
pub trait Step: Send + Sync {
fn id(&self) -> &str;
fn description(&self) -> &str { "" }
fn input_schema(&self) -> serde_json::Value { Value::Null }
fn output_schema(&self) -> serde_json::Value { Value::Null }
async fn execute(&self, input: Value, ctx: &StepContext) -> Result<StepOutcome>;
}StepOutcome
pub enum StepOutcome {
Done(Value), // produced output, step complete
Suspended { resume_schema: Value, payload: Value }, // pause until resumed
}
StepOutcome::done(json!({ "ok": true })); // convenience for DoneReturning Suspended halts the run with status Suspended; the run is resumed later with Workflow::resume. See suspend/resume.
StepContext
Passed by reference to every execute, like ToolContext.
pub struct StepContext {
pub run_id: String,
pub state: Arc<Mutex<Value>>, // shared, mutable workflow state
pub events: mpsc::Sender<WorkflowEvent>,
pub extensions: Extensions, // cersei_tools type-map for runtime injection
pub resume_data: Option<Value>, // Some(_) only on a resumed step
}
impl StepContext {
pub fn state(&self) -> Value; // clone the current shared state
pub fn set_state(&self, value: Value); // replace it and emit StateUpdated
}state is a synchronous lock. Read or write it and drop the guard — never hold it across an .await.
StepRun (typed)
The typed counterpart, mirroring ToolExecute. It is the intended target of a future #[derive(Step)] macro.
#[async_trait]
pub trait StepRun: Send + Sync {
type Input: DeserializeOwned + JsonSchema + Send;
type Output: Serialize + JsonSchema + Send;
fn id(&self) -> &str;
async fn run(&self, input: Self::Input, ctx: &StepContext) -> Result<Self::Output>;
}First-party steps
| Type | Wraps | Output shape |
|---|---|---|
FnStep | an async closure | whatever the closure returns |
AgentStep | a cersei_agent::Agent | { text, turns, stop_reason } |
ToolStep | any cersei_tools::Tool | { content, is_error, metadata } |
WorkflowStep | a nested Workflow | the nested workflow's final result |
FnStep
// Output a plain Value:
FnStep::new("upper", |input: Value, ctx: StepContext| async move {
Ok(json!({ "message": /* … */ }))
})
.description("uppercases message")
.input_schema(json!({ "type": "object" }))
.output_schema(json!({ "type": "object" }));
// Output a full StepOutcome (needed to suspend):
FnStep::with_outcome("gate", |input, ctx| async move {
match ctx.resume_data {
Some(data) => Ok(StepOutcome::Done(json!({ "approved": data }))),
None => Ok(StepOutcome::Suspended {
resume_schema: json!({ "type": "boolean" }),
payload: input,
}),
}
});AgentStep
Renders the node input into a prompt ({{input}} for the whole input as a string, {{field}} for a top-level field), runs the agent to completion, and returns { text, turns, stop_reason }.
let agent = Arc::new(Agent::builder().provider(/* … */).build()?);
AgentStep::new("triage", agent, "Classify this ticket:\n{{body}}");ToolStep
Reuses the tool's own input_schema, dispatches the node input straight into Tool::execute, and surfaces a tool error as a failed step. The workflow's extensions flow into the constructed ToolContext.
ToolStep::new("read", Arc::new(my_tool)).working_dir("/srv/project");WorkflowStep
Runs a compiled nested workflow; a nested suspend propagates upward.
WorkflowStep::new("subflow", Arc::new(child_workflow));The IR — WorkflowDef
The serializable graph. Identical whether produced by the builder or the UI.
pub struct WorkflowDef {
pub id: String,
pub input_schema: Value,
pub output_schema: Value,
pub nodes: Vec<WorkflowNode>,
pub edges: Vec<WorkflowEdge>,
pub entry: NodeId, // where execution begins
}
pub type NodeId = String; // string ids so xyflow owns them
pub struct WorkflowNode { pub id: NodeId, pub kind: NodeKind, pub ui: Option<UiHints> }
pub struct WorkflowEdge { pub from: NodeId, pub to: NodeId, pub kind: EdgeKind }NodeKind
pub enum NodeKind {
Step { step_id: String, config: Value }, // run a registered step (config merged into input)
Map { mapping: MapSpec }, // pure JSON reshape
Parallel, // fan-out marker
Join { strategy: JoinStrategy }, // wait for parallel branches
Branch, // first matching `When` arm wins
Loop { mode: LoopMode, body: NodeId, condition: Option<Condition> },
}
pub enum LoopMode { DoWhile, DoUntil, ForEach { concurrency: usize } }
pub enum JoinStrategy { AllOrFail, AllSettled }EdgeKind
pub enum EdgeKind {
Then, // sequential continuation
Fork, // Parallel node -> a branch
Merge, // a branch -> Join node
When { condition: Condition }, // a Branch arm
LoopBack, // a loop body's tail -> its Loop node (the only legal cycle)
}MapSpec and UiHints
pub struct MapSpec { pub fields: HashMap<String, String> } // out_field -> JSON-pointer into scope
pub struct UiHints { pub x, y: Option<f64>, pub label, color: Option<String> }How the IR maps to xyflow
| Builder / Mastra | IR | xyflow |
|---|---|---|
.then(step) | Step node + Then edge | default node + default edge |
.parallel([...]) | Parallel → Fork edges → Join ← Merge edges | one source → N targets, join node |
.branch([[cond, step]]) | Branch + When edges (labeled by predicate) | node with labeled edges |
.dowhile / .dountil | Loop + LoopBack | node with an animated back edge |
.foreach | Loop { ForEach } | node badged "foreach" |
.map(fn) | Map node | "transform" node |
Condition
Serializable branch predicates over the run scope { input, state, steps: { <node_id>: <output> }, current }. Paths are JSON Pointers (/current/premium, /steps/score_1/value); a leading / is optional.
pub enum Condition {
Always,
Eq { path: String, value: Value },
Ne { path: String, value: Value },
Gt { path: String, value: f64 },
Lt { path: String, value: f64 },
Exists { path: String },
Truthy { path: String },
And(Vec<Condition>),
Or(Vec<Condition>),
Not(Box<Condition>),
}
impl Condition { pub fn eval(&self, scope: &Value) -> bool; }Condition::And(vec![
Condition::Truthy { path: "current/active".into() },
Condition::Gt { path: "steps/score_1/value".into(), value: 0.8 },
]);StepRegistry
Host-side map of step-id → Arc<dyn Step>. UI JSON references steps by id only; the registry supplies the implementations, and Workflow::compile resolves them.
let registry = StepRegistry::new(); // Arc<StepRegistry>
registry.register(Arc::new(my_step));
registry.get("upper"); // Option<Arc<dyn Step>>
registry.ids(); // Vec<String>
registry.catalog(); // Vec<StepInfo> — the UI builder palette
// Builder-style chaining:
let registry = StepRegistry::new()
.with(Arc::new(step_a))
.with(Arc::new(step_b));pub struct StepInfo { pub id, description: String, pub input_schema, output_schema: Value }WorkflowBuilder
The programmatic front-end. Emits the same WorkflowDef the UI does; it auto-generates node ids and synthesizes Parallel/Join/Branch marker nodes.
let def = WorkflowBuilder::new("my-flow")
.input_schema(json!({ "type": "object" }))
.output_schema(json!({ "type": "object" }))
.then("step_a") // Step + Then
.then_with("step_b", json!({ "cfg": 1 })) // Step carrying static config
.map(MapSpec { fields }) // Map reshape
.parallel(&["fetch_x", "fetch_y"], JoinStrategy::AllOrFail)
.branch(vec![
(Condition::Truthy { path: "current/urgent".into() }, "escalate"),
(Condition::Always, "normal"),
])
.dowhile("poll", Condition::Truthy { path: "current/pending".into() })
.dountil("retry", Condition::Truthy { path: "current/ok".into() })
.commit(); // -> WorkflowDefWorkflow
A validated, executable workflow. compile resolves every Step node against the registry and checks structural sanity (single entry, branch arms are When, loop bodies exist, no stray cycles) before any execution.
pub fn compile(def: WorkflowDef, registry: &StepRegistry) -> Result<Arc<Workflow>>;
pub async fn start(&self, input: Value) -> Result<WorkflowResult>;
pub fn stream(self: &Arc<Self>, input: Value) -> WorkflowStream;
pub async fn resume(&self, run_id: &str, node_id: &NodeId, data: Value) -> Result<WorkflowResult>;
pub fn def(&self) -> &WorkflowDef;Compilation rejects an unknown step id with CerseiError::Tool("unknown step: '…'") and a malformed graph with CerseiError::Config(…).
Events — WorkflowEvent / WorkflowStream
Every variant is Serialize (externally tagged, snake_case), so it can be forwarded to a browser to drive the live graph.
pub enum WorkflowEvent {
WorkflowStarted { run_id },
StepStarted { node_id, step_id, input },
StepCompleted { node_id, output, duration_ms },
StepFailed { node_id, error },
StepSuspended { node_id, resume_schema },
BranchTaken { node_id, edge_to },
StateUpdated { state },
LoopIteration { node_id, iteration },
Inner { node_id, inner }, // bridged nested agent/workflow activity
WorkflowCompleted { result }, // carries the terminal WorkflowResult
Error(String),
}pub struct WorkflowStream { /* … */ }
impl WorkflowStream {
pub async fn next(&mut self) -> Option<WorkflowEvent>;
pub fn cancel(&self);
pub fn resume(&self, node_id: NodeId, data: Value);
pub async fn collect(self) -> Option<WorkflowResult>; // drain, return the terminal result
}Results — WorkflowResult / RunStatus
Shaped to match Mastra's JSON output so a UI can render it directly.
pub enum RunStatus { Running, Success, Failed, Suspended, Paused } // serde: lowercase
pub struct WorkflowResult {
pub run_id: String,
pub status: RunStatus,
pub input: Value,
pub steps: HashMap<NodeId, StepResult>,
pub result: Option<Value>, // final output on Success
pub state: Value, // terminal shared state
pub error: Option<String>,
pub suspended: Vec<SuspendPoint>,
}
pub struct StepResult {
pub status: RunStatus,
pub output: Option<Value>,
pub error: Option<String>,
pub started_at: i64,
pub ended_at: Option<i64>,
}
pub struct SuspendPoint { pub node_id: NodeId, pub resume_schema: Value, pub payload: Value }Persistence — RunStore
Backs suspend/resume. The MVP is an in-memory store keyed by run_id; the same API is intended to gain a durable JSONL / cersei-memory backend so suspended runs survive a restart.
pub struct RunSnapshot { pub run_id, pub input, pub state: Value, pub outputs: HashMap<NodeId, Value> }
let store = RunStore::in_memory();
store.save(snapshot);
store.load(&run_id); // Option<RunSnapshot>A compiled Workflow owns its store; start() saves a snapshot when a step suspends, and resume() reloads it, replays the already-computed nodes from the snapshot, and continues from the suspended node with the supplied resume_data.
Workflows Overview
A first-party, serializable workflow engine for Cersei — author multi-step pipelines in Rust or draw them in a visual builder (React + xyflow). One IR, two front-ends.
Workflows Cookbook
Complete, runnable cersei-workflows recipes — sequential pipelines, parallel fan-out, branching, shared state, agent and tool steps, nested workflows, suspend/resume, and driving it all from a UI.