Skip to content

API Reference

Reference for the Rust crates. Full rustdoc with all generics, trait bounds, and method signatures is published on docs.rs — this page is a high-level cheatsheet.

Cratedocs.rscrates.ioPurpose
coralstack-cmd-ipcdocs.rscrates.ioCore registry, channels, protocol
coralstack-cmd-ipc-macrosdocs.rscrates.io#[command] / #[command_service] / #[event] / #[payload] attribute macros
coralstack-cmd-ipc-mcpdocs.rscrates.ioMCP server adapter (via rmcp)
use coralstack_cmd_ipc::prelude::*;

Brings into scope: command, command_service, event, payload, BoxedDynCommand, ChannelError, Command, CommandChannel, CommandDef, CommandError, CommandRegistry, CommandSchema, Config, DynCommand, DynEvent, Event, InMemoryChannel.

The central hub — mirrors the TypeScript CommandRegistry 1:1.

use coralstack_cmd_ipc::prelude::*;
use std::time::Duration;
let registry = CommandRegistry::new(Config {
id: Some("main".into()),
router_channel: None,
request_ttl: Duration::from_secs(30),
event_ttl: Duration::from_secs(5),
max_in_flight_per_channel: 256,
});
FieldTypeDescription
idOption<String>Registry identifier for logging
router_channelOption<String>Channel ID for escalating unknown commands
request_ttlDurationTimeout for pending requests
event_ttlDurationTTL for event deduplication
max_in_flight_per_channelusizePer-channel cap on concurrently in-flight handler futures (default 256; 0 disables)

Every field has a sensible default — call Config::default() or use ..Default::default() in a struct literal and you can omit anything you don’t care about:

let registry = CommandRegistry::new(Config {
id: Some("main".into()),
..Default::default() // fills in router_channel, request_ttl, event_ttl, max_in_flight_per_channel
});

The fields are only syntactically required when you list them out one by one (Rust struct literals must name every field). Functionally none of them are mandatory.

MethodMirrors TSNotes
register_command(cmd)registerCommandSingle entry point. Takes any C: Command — typed #[command]-macro structs for compile-time commands, DynCommand for runtime-constructed commands
register_channel(channel)registerChannelReturns a driver future — spawn it on your executor
execute::<C>(req)strict executeCommand<K>C: Command pins request and response types
execute_dyn(id, req: Value)loose executeCommandRaw JSON in, raw JSON out — for runtime-known ids
emit(event)emitEventSingle entry point. Takes any E: Event — typed #[event]-macro structs or DynEvent
on::<E>(cb)typed addEventListenerE: Event + DeserializeOwned; callback receives deserialized E
on_dyn(id, cb)dynamic addEventListenerRuntime id; callback receives raw Value
list_commands()listCommandsReturns Vec<CommandDef>
list_channels()listChannelsReturns connected channel IDs
dispose().awaitdisposeAsync — awaits each channel.close() before returning
id()id propertyRegistry identifier

register_channel does not spawn anything — it returns a future you own. Poll it with whatever executor you run elsewhere:

// Tokio
let driver = registry.register_channel(channel).await?;
tokio::spawn(driver);
// futures::executor / smol / async-std — same shape, different spawner.

Forgetting to spawn the driver means the registry never processes incoming messages from that channel. dispose() will still work (it awaits each channel.close()), but until then the channel is silent from the registry’s point of view.

Within the driver task, the pump dispatches handlers concurrently: each incoming message’s handler future is pushed into a FuturesUnordered and cooperatively interleaved with the next recv. A handler that awaits external work (timer, network, forwarded call) no longer blocks subsequent messages on the same channel — fast commands, forwarded responses, and events all continue flowing.

The number of simultaneously in-flight handlers on one channel is capped by Config::max_in_flight_per_channel (default 256). At the cap the pump applies backpressure to the channel rather than dropping messages; set to 0 to disable the cap.

This is cooperative concurrency on a single executor task, not multi-thread parallelism. For true CPU-parallel dispatch, wrap your handler body in your runtime’s spawn (e.g. tokio::spawn) — the crate itself stays runtime-agnostic.

The registry rejects pending execute calls after request_ttl with a Timeout error. Late responses that arrive after expiry find no pending handler and are silently dropped (per spec). For slow handlers — long JS execution in a plugin, large HTTP fetches — raise request_ttl above the worst-case handler duration, or enforce a shorter timeout on the handler side.

Model void as ():

#[command("worker.reset")]
async fn reset(&self) -> Result<(), CommandError> { /* ... */ Ok(()) }
// Unit struct = void event
#[event("worker.tick")]
struct WorkerTick;

The macro detects () for request or response and omits that slot from the advertised schema. On the wire the registry omits the request, result, and payload fields entirely (rather than sending null), per the JSON schemas in spec/.

Strict-mode typing — the Rust equivalent of CommandSchemaMap.

use coralstack_cmd_ipc::prelude::*;
#[payload]
struct AddReq { a: i64, b: i64 }
struct Add;
impl Command for Add {
const ID: &'static str = "math.add";
const DESCRIPTION: Option<&'static str> = Some("Add two integers");
type Request = AddReq;
type Response = i64;
async fn handle(&self, req: AddReq) -> Result<i64, CommandError> {
Ok(req.a + req.b)
}
// Optional — omit to skip schema advertising. Without a schema,
// cross-language peers fall back to permissive validation.
fn schema(&self) -> Option<CommandSchema> {
Some(CommandSchema::empty().with_request(
serde_json::to_value(coralstack_cmd_ipc::schemars::schema_for!(AddReq)).unwrap(),
))
}
}
// Register the typed command. The id, description, schema, and
// request/response adapter are all pulled from the `Command` impl.
registry.register_command(Add).await?;
// Strict call — request and response types checked at compile time.
let result: i64 = registry.execute::<Add>(AddReq { a: 2, b: 3 }).await?;

In practice you’d use #[command("math.add")] async fn add(&self, req: AddReq) -> Result<i64, CommandError> { ... } — the #[command] / #[command_service] macros generate the Command impl (including schema()) and a register_* / .register(&registry) helper that calls register_command for you. See Defining Commands.

When the id or schema is only known at runtime (plugin runtimes, FFI, scripting hosts), build a DynCommand and register it the same way. There is no separate “dyn” registration method — DynCommand implements Command, so it goes through register_command alongside typed commands:

use serde_json::{json, Value};
// Dynamic id, `Value` in/out — simplest form.
let cmd = DynCommand::new(runtime_id, |req: Value| async move {
Ok(json!({ "echoed": req }))
});
registry.register_command(cmd).await?;
// Dynamic id, typed in/out — works too: types are inferred from the closure.
let cmd = DynCommand::new(runtime_id, |req: AddReq| async move {
Ok(req.a + req.b)
})
.description("Runtime adder")
.request_schema(runtime_request_schema)
.response_schema(runtime_response_schema);
registry.register_command(cmd).await?;

Builders:

  • .description(..) — human-readable description surfaced to MCP clients and list_commands.
  • .schema(CommandSchema) — attach a full request+response schema pair.
  • .request_schema(Value) / .response_schema(Value) — set one slot at a time.

For a Vec of heterogeneous runtime commands (a plugin host holding many handlers), use BoxedDynCommand:

use coralstack_cmd_ipc::BoxedDynCommand;
use serde_json::json;
let a: BoxedDynCommand = DynCommand::boxed("plugin.hello", |_| async { Ok(json!("hi")) });
let b: BoxedDynCommand = DynCommand::boxed("plugin.bye", |_| async { Ok(json!("bye")) });
let commands: Vec<BoxedDynCommand> = vec![a, b];
for cmd in commands {
registry.register_command(cmd).await?;
}

DynCommand::boxed erases the handler’s concrete closure type into BoxedHandler, giving every command the same static type so they can share a collection.

CommandSchema::permissive() advertises a fully open any → any schema — the right default when a plugin’s exported function has no introspectable types (e.g. a QuickJS function(req) { ... }). CommandSchema::empty() advertises no schema on either slot (void/void); attach real schemas via .with_request(..) / .with_response(..).

Typed events — the event payload struct is the event. Attach #[event("id")] to a struct and it becomes a typed event usable with emit / on. The macro auto-derives Serialize / Deserialize / JsonSchema against the re-exports from coralstack-cmd-ipc, so user crates don’t add those dependencies themselves.

use coralstack_cmd_ipc::prelude::*;
/// Worker has finished initializing.
#[event("worker.ready")]
pub struct WorkerReady {
pub worker_id: String,
pub command_count: u32,
}
// Void event — a unit struct emits no payload on the wire.
#[event("worker.tick")]
pub struct WorkerTick;
// Emit — fully typed.
registry.emit(WorkerReady {
worker_id: "w1".into(),
command_count: 2,
})?;
registry.emit(WorkerTick)?;
// Listen — callback receives a deserialized `WorkerReady`.
let _unsub = registry.on::<WorkerReady>(|event| {
println!("{} ready with {}", event.worker_id, event.command_count);
});

When the id or payload shape is only known at runtime, build a DynEvent:

use serde_json::json;
registry.emit(DynEvent::new("plugin.foo", json!({ "ok": true })))?;
let _unsub = registry.on_dyn("plugin.foo", |payload| {
println!("got dynamic event: {payload}");
});

Private events (id starts with _) fire only to local listeners — never broadcast across channels. Same rule as for commands.

#[async_trait]
pub trait CommandChannel: Send + Sync {
fn id(&self) -> &str;
async fn send(&self, message: Message) -> Result<(), ChannelError>;
fn subscribe(&self) -> Receiver<Message>;
async fn close(&self) -> Result<(), ChannelError>;
}

Built-in implementation:

  • InMemoryChannelInMemoryChannel::pair(local_id, peer_id) returns two Arc<InMemoryChannel> halves wired together. Ideal for same-process tests and single-process multi-registry setups. See InMemoryChannel.

Implement the trait for custom transports (WebSocket, gRPC, stdio, etc.). The wire Message type is byte-identical to the TypeScript union — see Protocol.

Re-exported from coralstack-cmd-ipc-macros. See Defining Commands for full usage.

  • #[command("id", description = "...")] — attribute on a free async fn or an impl method. For a free fn named greet, also emits a register_greet(&registry).await? helper and a GreetCommand type usable with strict execute::<GreetCommand>(..).
  • #[command_service] — attribute on an impl Host block. Generates:
    • Host.register(&registry).await? — installs every tagged method.
    • A nested pub(super) mod host_snake_case_name containing one wrapper type per method, reachable for strict execute: registry.execute::<host_name::MethodName>(req).await?.
  • #[event("id")] — attribute on a payload struct. Auto-derives Serialize / Deserialize / JsonSchema and emits an impl Event for Struct with id and schemars-derived schema. The struct is then usable as registry.emit(Struct { .. }) and registry.on::<Struct>(cb). Unit structs (struct Foo;) produce void events (no payload on the wire).
  • #[payload] — attribute on any struct (typically command request/response types). Auto-derives Serialize / Deserialize / JsonSchema against the serde / schemars re-exports from coralstack-cmd-ipc, so user crates don’t need to add those dependencies themselves. Additive — extra derives (Clone, Debug, …) can be stacked on top normally.
TypePurpose
CommandErrorReturned from command handlers; serialized as ExecuteError on the wire
ChannelErrorTransport-level failures
ExecuteErrorCodeEnum of wire-level error codes (CommandNotFound, ValidationError, etc.)
RegisterErrorCodeEnum of registration error codes (CommandAlreadyRegistered)

coralstack-cmd-ipc-mcp::McpServerChannel implements CommandChannel, so it registers on the registry like any other channel. It wraps an rmcp server handler internally.

use std::sync::Arc;
use coralstack_cmd_ipc_mcp::McpServerChannel;
let mcp = Arc::new(McpServerChannel::new("mcp"));
let driver = registry.register_channel(mcp.clone()).await?;
tokio::spawn(driver);
mcp.serve_stdio().await?;

Pure translation between MCP and cmd-ipc wire messages — no registry reference held. MCP tools/list / tools/call emit ListCommandsRequest / ExecuteCommandRequest through recv(); the registry’s ExecuteCommandResponse comes back via send() and is correlated to the waiting MCP call by thid.

MethodPurpose
new(id)Create the channel with a given id
with_implementation(name, version)Override the serverInfo reported to MCP clients
with_instructions(s)Attach an instructions string for MCP clients
with_timeout(Duration)Timeout for MCP-originated requests awaiting the registry
serve(transport)Drive the MCP protocol over any rmcp transport (stdio, TCP, duplex, WebSocket, …)
serve_stdio()Convenience wrapper: serve(rmcp::transport::io::stdio())
into_handler()Returns an rmcp ServerHandler — use this to plug into HTTP frameworks (axum, actix, warp, …) as a per-session handler factory

Stdio ships in the crate by default. For HTTP or any other transport, enable the relevant rmcp feature in your own Cargo.toml and pass the transport to serve() (or use into_handler() for HTTP’s per-session model).

Private commands (prefixed with _) are never exposed as tools.