Skip to content

Quick Start

A calculator where math operations run on a dedicated CommandRegistry wired to a root registry via InMemoryChannel. The root calls worker commands through the channel, events fan out across the pair, and private commands stay local. The same pattern works across real process boundaries once you swap the in-memory channel for a transport — see Channels.

See also the full multi-service example, which follows the same structure with a REPL on top.

This guide uses tokio as the async runtime. The coralstack-cmd-ipc crate itself is runtime-agnostic (it only depends on futures), but every example in this guide spawns channel drivers with tokio::spawn and drives main with #[tokio::main]. Swap tokio for async-std, smol, or futures::executor::ThreadPool if you prefer — the library doesn’t care.

Cargo.toml
[dependencies]
coralstack-cmd-ipc = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
serde_json = "1"
anyhow = "1"

A #[command_service] impl block turns an ordinary impl into a set of typed commands. Each #[command("id")] method becomes a handler. Declare request / response types with #[payload] — the macro auto-derives Serialize / Deserialize / JsonSchema against the cmd-ipc re-exports, and the registry generates the wire-level JSON Schema from those derives.

src/calc_service.rs
use coralstack_cmd_ipc::prelude::*;
#[payload]
pub struct CalcReq { pub a: f64, pub b: f64 }
#[payload]
pub struct CalcRes { pub result: f64 }
pub struct CalcService;
#[command_service]
impl CalcService {
#[command("calc.add", description = "Adds two numbers together")]
async fn add(&self, req: CalcReq) -> Result<CalcRes, CommandError> {
Ok(CalcRes { result: req.a + req.b })
}
#[command("calc.multiply", description = "Multiplies two numbers together")]
async fn multiply(&self, req: CalcReq) -> Result<CalcRes, CommandError> {
Ok(CalcRes { result: req.a * req.b })
}
}

For one-off commands that don’t warrant a service struct, use DynCommand with an inline closure, or attach #[command] directly to a free async fn.

Step 2: Set Up the Registry and Register Commands

Section titled “Step 2: Set Up the Registry and Register Commands”

A CommandRegistry owns the command table, routing, and channel connections. Build one per process (or per logical service) and install your commands with .register(&registry). Since register is async, we do this inside a #[tokio::main] function:

src/main.rs
use coralstack_cmd_ipc::prelude::*;
mod calc_service;
use calc_service::{CalcReq, CalcRes, CalcService};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Worker registry — owns the calc commands. `router_channel` tells
// it where to escalate commands it doesn't serve locally.
let worker = CommandRegistry::new(Config {
id: Some("calc-worker".into()),
router_channel: Some("main".into()),
..Default::default()
});
// `.register(&registry)` installs every `#[command]`-tagged method
// from `CalcService` in one shot.
CalcService.register(&worker).await?;
// Root registry — no `router_channel` (it's the terminal point).
let root = CommandRegistry::new(Config {
id: Some("main".into()),
..Default::default()
});
// Private command, registered via `DynCommand` for a one-off closure.
root.register_command(DynCommand::new(
"_main.log",
|payload: serde_json::Value| async move {
eprintln!("[Main] {payload}");
Ok(serde_json::Value::Null)
},
)).await?;
// ... (Step 3 continues inside the same `main`)
Ok(())
}

A channel is the pipe between two registries. InMemoryChannel::pair wires two halves of an in-process pair — the same API a real transport (WebSocket, stdio, gRPC, etc.) would expose via the CommandChannel trait.

// continues inside `async fn main`:
// `InMemoryChannel::pair(peer_a_id, peer_b_id)` returns the two halves
// from each side's perspective. The `Arc<InMemoryChannel>` values
// coerce to `Arc<dyn CommandChannel>` at the `register_channel` call
// site automatically.
let (ch_for_root, ch_for_worker) = InMemoryChannel::pair("calc-worker", "main");
// Attach the channel to each registry. `register_channel` returns a
// driver future that we spawn onto tokio. The driver consumes inbound
// messages and dispatches them into the registry until the channel
// closes.
let root_driver = root.register_channel(ch_for_root).await?;
let worker_driver = worker.register_channel(ch_for_worker).await?;
tokio::spawn(root_driver);
tokio::spawn(worker_driver);

Once both drivers are running, the registries perform a handshake: each side advertises its local commands via list.commands.response, and any new register_command call on a child registry escalates upstream. From this point on, the root sees calc.add and calc.multiply as routable commands even though they live on the worker.

With the channel up, the root registry dispatches through it for any command it doesn’t own locally. Two dispatch modes:

// Strict mode — compile-time type identity. The `#[command_service]`
// macro generates a `calc_service::Add` (and `Multiply`) struct you
// can reach without constructing a service instance. The request and
// response types are pinned by the `Command` impl, so no type
// ascription is needed on the `let`.
let sum = root
.execute::<calc_service::Add>(CalcReq { a: 5.0, b: 3.0 })
.await?;
println!("5 + 3 = {}", sum.result); // 8
let product = root
.execute::<calc_service::Multiply>(CalcReq { a: 4.0, b: 7.0 })
.await?;
println!("4 × 7 = {}", product.result); // 28
// Loose mode — runtime-known command id. Raw JSON in, raw JSON out.
// Use this for plugins, FFI, scripting hosts, or anywhere the id
// only becomes known at runtime.
root.execute_dyn(
"_main.log",
serde_json::json!({ "level": "info", "message": "Calculations complete!" }),
).await?;

Strict mode (execute::<calc_service::Add>(..)) and loose mode (execute_dyn("calc.add", ..)) cover both ends of the typing spectrum. See Type Safety for the tradeoffs and when to reach for each.

Events are fire-and-forget broadcasts — no response, no routing escalation, just a payload that fans out to every connected registry (unless the id is private). Define a payload struct with #[event("id")] and the struct itself becomes the typed event.

// src/calc_service.rs (or wherever your shared types live)
use coralstack_cmd_ipc::prelude::*;
/// Worker finished initializing.
#[event("worker.ready")]
pub struct WorkerReady {
pub worker_id: String,
pub command_count: u32,
}

on::<E>(cb) subscribes by the compile-time E::ID and delivers the payload already deserialized into E — no casting, no JSON juggling:

let _unsub = root.on::<WorkerReady>(|event| {
println!("{} ready with {} commands", event.worker_id, event.command_count);
});

The return value is an impl FnOnce() that unsubscribes when called. Drop it (or call it) to remove just this listener; ignore it to leave the listener installed for the lifetime of the registry.

The payload struct is the event — no separate marker type, no emit_event(id, payload) tuple. Just construct and emit:

worker.emit(WorkerReady {
worker_id: "calc-worker".into(),
command_count: 2,
})?;

emit fans out to every connected channel (with dedup on the message id to prevent echo loops in meshes) and also dispatches to local listeners on the same registry.

For event ids that only exist at runtime (plugin runtimes, FFI, scripting hosts), use DynEvent on the emit side and on_dyn on the listen side — same machinery, runtime ids + raw JSON payloads:

registry.emit(DynEvent::new(runtime_id, serde_json::json!({ "ok": true })))?;
let _unsub = registry.on_dyn(runtime_id, |payload: serde_json::Value| {
println!("got event: {payload}");
});

DynEvent::new(id, payload) takes the id + payload; .schema(..) is an optional builder method for MCP/tooling consumers.

  • Commands are typed functions#[command_service] + #[command] binds methods to IDs; Serialize / Deserialize / JsonSchema derives drive the wire schema.
  • Events are typed payloads#[event] on a payload struct turns it into a first-class event type usable with emit / on.
  • Registries own the command table — one per process or logical service, wired by channels.
  • Channels are pluggableInMemoryChannel for in-process tests, implement the CommandChannel trait for any transport.
  • Private ids (prefixed with _) stay local — for both commands and events.
  • Runtime is your choice — this guide uses tokio, but any Future executor works.