Files
spaceshell/crates/spaceshd/src/server.rs
T
vasyansk 56ba1723b9 feat(daemon): EventLog ring model with read-flags
Add event_log.rs with EventLog (capped VecDeque, monotonic ids, mark_read,
snapshot/restore) and EventLogState. Register mod in main.rs. Stub
Cmd::EventLog and Cmd::MarkRead arms in server.rs to keep the exhaustive
match compiling; full wiring follows in Task 4.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 07:21:26 +07:00

860 lines
40 KiB
Rust

use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use base64::Engine;
use spacesh_proto::codec::{read_frame, write_frame};
use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId, WorkspaceId};
use spacesh_proto::status::SurfaceState;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{mpsc, oneshot};
use crate::persist::{self, Persister};
use crate::registry::Registry;
use crate::state_store::StateStore;
use crate::surface::{SurfaceMsg};
/// Per-client outbound channel: the router pushes envelopes the client task writes out.
type ClientTx = mpsc::Sender<Envelope>;
/// Messages into the single router task.
enum ServerMsg {
/// A request from a client; reply routed to that client's `out`.
Request { id: u64, cmd: Cmd, client: ClientId, out: ClientTx },
/// Forward an output chunk to all subscribers of `surface_id`.
Output { surface_id: SurfaceId, bytes: Vec<u8> },
/// A surface process exited.
Exit { surface_id: SurfaceId, code: i32 },
/// Register a new client's event sink.
ClientConnected { client: ClientId, out: ClientTx },
/// Drop a client and all its subscriptions.
ClientDisconnected { client: ClientId },
/// A status change detected internally (OSC 133 / fallback) by a surface actor.
StateDetected { surface_id: SurfaceId, state: SurfaceState },
}
type ClientId = u64;
pub async fn serve(socket: &Path, store: Arc<dyn StateStore>) -> Result<()> {
let listener = UnixListener::bind(socket)?;
let (router_tx, router_rx) = mpsc::channel::<ServerMsg>(256);
// Exit events from surfaces are funneled into the router.
let (exit_tx, mut exit_rx) = mpsc::unbounded_channel::<(SurfaceId, i32)>();
let router_for_exit = router_tx.clone();
tokio::spawn(async move {
while let Some((sid, code)) = exit_rx.recv().await {
let _ = router_for_exit.send(ServerMsg::Exit { surface_id: sid, code }).await;
}
});
let (state_tx, mut state_rx) = mpsc::unbounded_channel::<(SurfaceId, SurfaceState)>();
let router_for_state = router_tx.clone();
tokio::spawn(async move {
while let Some((sid, st)) = state_rx.recv().await {
let _ = router_for_state.send(ServerMsg::StateDetected { surface_id: sid, state: st }).await;
}
});
let persister = persist::spawn(store.clone(), Duration::from_millis(500));
let initial = store.load().unwrap_or_default();
let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx, state_tx, persister, initial));
let mut next_client: ClientId = 0;
loop {
let (stream, _addr) = listener.accept().await?;
let client_id = next_client;
next_client += 1;
let router_tx = router_tx.clone();
tokio::spawn(handle_client(stream, client_id, router_tx));
if shutdown.is_finished() {
break;
}
}
Ok(())
}
async fn handle_client(stream: UnixStream, client_id: ClientId, router_tx: mpsc::Sender<ServerMsg>) {
let (mut read_half, mut write_half) = stream.into_split();
let (out_tx, mut out_rx) = mpsc::channel::<Envelope>(256);
let _ = router_tx
.send(ServerMsg::ClientConnected { client: client_id, out: out_tx.clone() })
.await;
// Writer task: drain outbound envelopes to the socket.
let writer = tokio::spawn(async move {
while let Some(env) = out_rx.recv().await {
if write_frame(&mut write_half, &env).await.is_err() {
break;
}
}
});
// Reader loop: parse frames and forward requests to the router.
loop {
match read_frame(&mut read_half).await {
Ok(Some(Envelope::Req { id, cmd })) => {
let _ = router_tx
.send(ServerMsg::Request { id, cmd, client: client_id, out: out_tx.clone() })
.await;
}
Ok(Some(_)) => { /* clients don't send res/evt; ignore */ }
Ok(None) => break, // EOF
Err(_) => break,
}
}
let _ = router_tx.send(ServerMsg::ClientDisconnected { client: client_id }).await;
writer.abort();
}
async fn router(
mut rx: mpsc::Receiver<ServerMsg>,
router_tx: mpsc::Sender<ServerMsg>,
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
persister: Persister,
initial: crate::state_store::PersistState,
) {
let mut reg = Registry::new();
reg.restore(initial);
let mut clients: HashMap<ClientId, ClientTx> = HashMap::new();
// surface_id → set of client ids subscribed (attached).
let mut subs: HashMap<SurfaceId, Vec<ClientId>> = HashMap::new();
while let Some(msg) = rx.recv().await {
match msg {
ServerMsg::ClientConnected { client, out } => {
clients.insert(client, out);
}
ServerMsg::ClientDisconnected { client } => {
clients.remove(&client);
for list in subs.values_mut() {
list.retain(|c| *c != client);
}
}
ServerMsg::Output { surface_id, bytes } => {
if let Some(list) = subs.get(&surface_id) {
let evt = Envelope::Evt(Evt::Output { surface_id: surface_id.clone(), bytes });
for c in list {
if let Some(out) = clients.get(c) {
let _ = out.try_send(evt.clone());
}
}
}
}
ServerMsg::Exit { surface_id, code } => {
reg.mark_stopped(&surface_id);
reg.drop_state(&surface_id);
let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code });
broadcast_evt(&clients, &evt);
}
ServerMsg::StateDetected { surface_id, state } => {
if reg.is_running(&surface_id) {
reg.set_state(&surface_id, state);
broadcast_evt(&clients, &Envelope::Evt(Evt::State { surface_id, state }));
}
}
ServerMsg::Request { id, cmd, client, out } => {
handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx, &state_tx, &persister).await;
}
}
}
}
fn broadcast_evt(clients: &HashMap<ClientId, ClientTx>, evt: &Envelope) {
for out in clients.values() {
let _ = out.try_send(evt.clone());
}
}
fn ok(id: u64, data: serde_json::Value) -> Envelope {
Envelope::Res { id, ok: true, data, error: None }
}
fn err(id: u64, code: &str, msg: &str) -> Envelope {
Envelope::Res { id, ok: false, data: serde_json::Value::Null,
error: Some(ErrorBody { code: code.into(), msg: msg.into() }) }
}
/// Compute spawn env (hooks for claude agents, zsh integration for zsh shells)
/// and whether a deterministic hook source is active.
fn spawn_env(sid: &SurfaceId, spec: &spacesh_proto::workspace::SurfaceSpec) -> (Vec<(String, String)>, bool) {
if crate::hooks::is_agent(&spec.command, spec.agent_label.as_deref()) {
let env = crate::hooks::prepare(sid, &crate::hooks::spacesh_bin());
let active = !env.is_empty();
(env, active)
} else if crate::hooks::is_zsh(&spec.command) {
(crate::hooks::shell_env(sid), false)
} else {
(vec![], false)
}
}
/// Emit a `layout_changed` event for a workspace's current tree.
fn emit_layout(reg: &Registry, ws_id: &WorkspaceId, clients: &HashMap<ClientId, ClientTx>) {
if let Some(w) = reg.workspace(ws_id) {
broadcast_evt(clients, &Envelope::Evt(Evt::LayoutChanged {
workspace_id: ws_id.clone(), layout: w.layout.clone(),
}));
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_request(
id: u64,
cmd: Cmd,
client: ClientId,
out: ClientTx,
reg: &mut Registry,
subs: &mut HashMap<SurfaceId, Vec<ClientId>>,
clients: &HashMap<ClientId, ClientTx>,
router_tx: &mpsc::Sender<ServerMsg>,
exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>,
state_tx: &mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
persister: &Persister,
) {
use spacesh_proto::message::SplitDir;
use spacesh_proto::layout::{LayoutNode, Orient};
use spacesh_proto::workspace::SurfaceSpec;
match cmd {
Cmd::Open { path } => {
let (ws_id, created) = reg.open_workspace(path.into());
if created {
if let Some(view) = reg.workspace_view(&ws_id) {
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view }));
}
persister.mark_dirty(reg.persist_state());
}
let _ = out.send(ok(id, serde_json::json!({ "workspace_id": ws_id.0 }))).await;
}
Cmd::NewSurface { workspace_id, command, args, cols, rows } => {
let Some(ws) = reg.workspace(&workspace_id).cloned() else {
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return;
};
let sid = reg.new_surface_id();
let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into()));
let spec = SurfaceSpec {
command: shell, args: args.clone(), cwd: ws.path.clone(),
agent_label: command, cols, rows, autostart: false,
};
let (env, hooks_active) = spawn_env(&sid, &spec);
match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) {
Ok(handle) => {
spawn_output_bridge(sid.clone(), &handle, router_tx.clone());
reg.set_live(handle);
reg.set_state(&sid, spacesh_proto::SurfaceState::Idle);
reg.add_surface_spec(&workspace_id, sid.clone(), spec);
// First panel of an empty workspace becomes the root leaf.
if let Some(w) = reg.workspace_mut(&workspace_id) {
if w.layout.is_none() {
w.layout = Some(LayoutNode::leaf(sid.clone()));
}
}
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated {
surface_id: sid.clone(), workspace_id: workspace_id.clone(),
}));
emit_layout(reg, &workspace_id, clients);
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::json!({ "surface_id": sid.0 }))).await;
}
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; }
}
}
Cmd::SplitSurface { surface_id, dir, command, args } => {
let Some(ws_id) = reg.workspace_of(&surface_id) else {
let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return;
};
let ws = reg.workspace(&ws_id).cloned().unwrap();
let new_sid = reg.new_surface_id();
let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into()));
let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false };
let (env, hooks_active) = spawn_env(&new_sid, &spec);
match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) {
Ok(handle) => {
spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone());
reg.set_live(handle);
reg.set_state(&new_sid, spacesh_proto::SurfaceState::Idle);
reg.add_surface_spec(&ws_id, new_sid.clone(), spec);
let orient = match dir { SplitDir::Right => Orient::H, SplitDir::Down => Orient::V };
if let Some(w) = reg.workspace_mut(&ws_id) {
let mut root = w.layout.take().unwrap_or_else(|| LayoutNode::leaf(surface_id.clone()));
spacesh_core::ops::split_leaf(&mut root, &surface_id, orient, true, new_sid.clone());
w.layout = Some(root);
}
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: new_sid.clone(), workspace_id: ws_id.clone() }));
emit_layout(reg, &ws_id, clients);
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::json!({ "surface_id": new_sid.0 }))).await;
}
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; }
}
}
Cmd::SetRatios { workspace_id, node_path, ratios } => {
let ok_set = reg.workspace_mut(&workspace_id).map(|w| {
if let Some(l) = w.layout.as_mut() {
spacesh_core::ops::set_ratios(l, &node_path, &ratios)
} else { false }
}).unwrap_or(false);
if ok_set {
emit_layout(reg, &workspace_id, clients);
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::Value::Null)).await;
} else {
let _ = out.send(err(id, "BAD_REQUEST", "invalid node_path or ratios")).await;
}
}
Cmd::MoveSurface { surface_id, target_surface_id, edge } => {
let Some(ws_id) = reg.workspace_of(&surface_id) else {
let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return;
};
if let Some(w) = reg.workspace_mut(&ws_id) {
if let Some(root) = w.layout.take() {
w.layout = Some(spacesh_core::ops::move_leaf(root, &surface_id, &target_surface_id, edge));
}
}
emit_layout(reg, &ws_id, clients);
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::Value::Null)).await;
}
Cmd::ApplyPreset { workspace_id, preset_id, slots } => {
let Some(count) = spacesh_core::presets::slot_count(&preset_id) else {
let _ = out.send(err(id, "BAD_REQUEST", "unknown preset")).await; return;
};
let Some(ws) = reg.workspace(&workspace_id).cloned() else {
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return;
};
// Kill current panels of this workspace.
let existing: Vec<SurfaceId> = ws.surfaces.keys().cloned().collect();
for sid in &existing {
if let Some(h) = reg.live(sid) { let _ = h.tx.send(crate::surface::SurfaceMsg::Close).await; }
reg.remove_surface(sid);
subs.remove(sid);
}
// Spawn `count` panels (slots padded/truncated to count).
let mut new_ids = Vec::new();
for i in 0..count {
let slot = slots.get(i);
let new_sid = reg.new_surface_id();
let command = slot.and_then(|s| s.command.clone());
let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into()));
let args = slot.map(|s| s.args.clone()).unwrap_or_default();
let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false };
let (env, hooks_active) = spawn_env(&new_sid, &spec);
match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) {
Ok(handle) => {
spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone());
reg.set_live(handle);
reg.set_state(&new_sid, spacesh_proto::SurfaceState::Idle);
reg.add_surface_spec(&workspace_id, new_sid.clone(), spec);
new_ids.push(new_sid);
}
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; return; }
}
}
if let Some(tree) = spacesh_core::presets::build(&preset_id, &new_ids) {
if let Some(w) = reg.workspace_mut(&workspace_id) { w.layout = Some(tree); }
}
for sid in &new_ids {
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: sid.clone(), workspace_id: workspace_id.clone() }));
}
emit_layout(reg, &workspace_id, clients);
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::json!({ "surface_ids": new_ids.iter().map(|s| s.0.clone()).collect::<Vec<_>>() }))).await;
}
Cmd::RestartSurface { surface_id } => {
if reg.is_running(&surface_id) {
let _ = out.send(ok(id, serde_json::Value::Null)).await; return; // already running
}
let Some(spec) = reg.surface_spec(&surface_id) else {
let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return;
};
let ws_id = reg.workspace_of(&surface_id).unwrap();
let (env, hooks_active) = spawn_env(&surface_id, &spec);
match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) {
Ok(handle) => {
spawn_output_bridge(surface_id.clone(), &handle, router_tx.clone());
reg.set_live(handle);
reg.set_state(&surface_id, spacesh_proto::SurfaceState::Idle);
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceRestarted { surface_id: surface_id.clone() }));
let _ = out.send(ok(id, serde_json::Value::Null)).await;
}
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; }
}
}
Cmd::CloseWorkspace { workspace_id } => {
let ids = reg.close_workspace(&workspace_id);
for sid in &ids { crate::hooks::cleanup(sid); crate::hooks::cleanup_shell(sid); subs.remove(sid); }
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceClosed { workspace_id: workspace_id.clone() }));
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::Value::Null)).await;
}
Cmd::SetWorkspaceMeta { workspace_id, name, group_id, unread, order } => {
let found = reg.workspace_mut(&workspace_id).map(|w| {
if let Some(n) = name { w.name = n; }
if let Some(g) = group_id { w.group_id = g; }
if let Some(u) = unread { w.unread = u; }
if let Some(o) = order { w.order = o; }
}).is_some();
if found {
if let Some(view) = reg.workspace_view(&workspace_id) {
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view }));
}
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::Value::Null)).await;
} else {
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await;
}
}
Cmd::CreateGroup { name, color } => {
let gid = reg.create_group(name, color);
broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() }));
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::json!({ "group_id": gid.0 }))).await;
}
Cmd::SetGroup { group_id, name, color, order } => {
let found = reg.group_mut(&group_id).map(|g| {
if let Some(n) = name { g.name = n; }
if let Some(c) = color { g.color = c; }
if let Some(o) = order { g.order = o; }
}).is_some();
if found {
broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() }));
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::Value::Null)).await;
} else {
let _ = out.send(err(id, "NOT_FOUND", "group")).await;
}
}
Cmd::DeleteGroup { group_id } => {
reg.delete_group(&group_id);
broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() }));
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::Value::Null)).await;
}
Cmd::Input { surface_id, bytes } => {
let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&bytes) else {
let _ = out.send(err(id, "BAD_REQUEST", "invalid base64")).await; return;
};
if let Some(s) = reg.live(&surface_id) {
let _ = s.tx.send(crate::surface::SurfaceMsg::Input(decoded)).await;
let _ = out.send(ok(id, serde_json::Value::Null)).await;
} else {
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
}
}
Cmd::Resize { surface_id, cols, rows } => {
if let Some(s) = reg.live(&surface_id) {
let _ = s.tx.send(crate::surface::SurfaceMsg::Resize { cols, rows }).await;
let _ = out.send(ok(id, serde_json::Value::Null)).await;
} else {
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
}
}
Cmd::Attach { surface_id } => {
if let Some(s) = reg.live(&surface_id) {
let (reply_tx, reply_rx) = oneshot::channel();
if s.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.is_ok() {
if let Ok((snap, _sub)) = reply_rx.await {
subs.entry(surface_id.clone()).or_default().push(client);
let _ = out.send(ok(id, serde_json::json!({
"snapshot": snap.ansi, "cols": snap.cols, "rows": snap.rows,
"cursor_row": snap.cursor_row, "cursor_col": snap.cursor_col,
}))).await;
return;
}
}
let _ = out.send(err(id, "INTERNAL", "attach failed")).await;
} else {
// stopped panel: no live stream, return an empty snapshot so the GUI shows the restart overlay.
let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0, "stopped": true }))).await;
}
}
Cmd::Detach { surface_id } => {
if let Some(list) = subs.get_mut(&surface_id) { list.retain(|c| *c != client); }
let _ = out.send(ok(id, serde_json::Value::Null)).await;
}
Cmd::Focus { surface_id: _ } => { let _ = out.send(ok(id, serde_json::Value::Null)).await; }
Cmd::Close { surface_id } => {
if reg.surface_spec(&surface_id).is_some() {
if let Some(h) = reg.live(&surface_id) { let _ = h.tx.send(crate::surface::SurfaceMsg::Close).await; }
let ws_id = reg.workspace_of(&surface_id);
reg.remove_surface(&surface_id);
subs.remove(&surface_id);
crate::hooks::cleanup(&surface_id);
crate::hooks::cleanup_shell(&surface_id);
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() }));
if let Some(ws_id) = ws_id { emit_layout(reg, &ws_id, clients); }
persister.mark_dirty(reg.persist_state());
let _ = out.send(ok(id, serde_json::Value::Null)).await;
} else {
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
}
}
Cmd::SetState { surface_id, state } => {
if reg.is_running(&surface_id) {
reg.set_state(&surface_id, state);
broadcast_evt(clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state }));
let _ = out.send(ok(id, serde_json::Value::Null)).await;
} else {
// unknown or stopped surface — status is only meaningful while running.
let _ = out.send(err(id, "NOT_FOUND", "surface not running")).await;
}
}
Cmd::Status => {
let (groups, workspaces) = reg.status();
let _ = out.send(ok(id, serde_json::json!({ "groups": groups, "workspaces": workspaces }))).await;
}
Cmd::EventLog { limit } => {
// TODO(SP2-T4): wire to EventLog once the shared state is plumbed in.
let _ = limit;
let _ = out.send(err(id, "NOT_IMPLEMENTED", "event log not yet wired")).await;
}
Cmd::MarkRead { target } => {
// TODO(SP2-T4): wire to EventLog once the shared state is plumbed in.
let _ = target;
let _ = out.send(err(id, "NOT_IMPLEMENTED", "mark_read not yet wired")).await;
}
Cmd::Shutdown => {
let _ = out.send(ok(id, serde_json::Value::Null)).await;
std::process::exit(0);
}
}
}
/// Pump a surface's broadcast output into the router as `ServerMsg::Output`.
fn spawn_output_bridge(
surface_id: SurfaceId,
handle: &crate::surface::SurfaceHandle,
router_tx: mpsc::Sender<ServerMsg>,
) {
let tx = handle.tx.clone();
tokio::spawn(async move {
// Ask the actor for a subscription receiver.
let (reply_tx, reply_rx) = oneshot::channel();
if tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.is_err() {
return;
}
let Ok(mut sub) = reply_rx.await else { return };
loop {
match sub.recv().await {
Ok(bytes) => {
if router_tx.send(ServerMsg::Output { surface_id: surface_id.clone(), bytes }).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(_) => break, // surface closed
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use base64::Engine;
async fn req(stream: &mut UnixStream, id: u64, cmd: Cmd) -> Envelope {
write_frame(stream, &Envelope::Req { id, cmd }).await.unwrap();
// Read until we see the matching res (skip interleaved evts).
loop {
let env = read_frame(stream).await.unwrap().unwrap();
if let Envelope::Res { id: rid, .. } = &env {
if *rid == id { return env; }
}
}
}
fn res_data(env: &Envelope) -> &serde_json::Value {
match env { Envelope::Res { data, .. } => data, _ => panic!("not a res") }
}
fn tempdir_path() -> std::path::PathBuf {
let mut p = std::env::temp_dir();
let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos();
p.push(format!("spaceshd-test-{n}"));
std::fs::create_dir_all(&p).unwrap();
p
}
async fn wait_for_socket(sock: &Path) {
for _ in 0..300 {
if UnixStream::connect(sock).await.is_ok() { return; }
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
}
panic!("socket never came up");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn open_new_surface_attach_streams_output() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let sock_for_task = sock.clone();
let store2 = store.clone();
tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; });
wait_for_socket(&sock).await;
let mut s = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
let r = req(&mut s, 2, Cmd::NewSurface {
workspace_id: spacesh_proto::WorkspaceId(ws),
command: Some("/bin/sh".into()),
args: vec!["-c".into(), "printf STREAM_OK; sleep 0.5".into()],
cols: 80, rows: 24,
}).await;
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
let surface_id = spacesh_proto::SurfaceId(sid);
let _ = req(&mut s, 3, Cmd::Attach { surface_id: surface_id.clone() }).await;
// Now read frames looking for an Output evt containing STREAM_OK.
let mut got = String::new();
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(6);
while tokio::time::Instant::now() < deadline {
if let Ok(Ok(Some(Envelope::Evt(Evt::Output { bytes, .. })))) =
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut s)).await {
got.push_str(&String::from_utf8_lossy(&bytes));
if got.contains("STREAM_OK") { break; }
}
}
assert!(got.contains("STREAM_OK"), "got: {got:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unknown_surface_returns_not_found() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let sock_for_task = sock.clone();
let store2 = store.clone();
tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; });
wait_for_socket(&sock).await;
let mut s = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut s, 1, Cmd::Input {
surface_id: spacesh_proto::SurfaceId("s_nope".into()),
bytes: base64::engine::general_purpose::STANDARD.encode(b"x"),
}).await;
match r {
Envelope::Res { ok, error, .. } => {
assert!(!ok);
assert_eq!(error.unwrap().code, "NOT_FOUND");
}
_ => panic!(),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reattach_returns_snapshot_with_prior_output() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let sock_for_task = sock.clone();
let store2 = store.clone();
tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; });
wait_for_socket(&sock).await;
// First client: open, new surface that prints a marker, attach, then disconnect.
let surface_id;
{
let mut s = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
let r = req(&mut s, 2, Cmd::NewSurface {
workspace_id: spacesh_proto::WorkspaceId(ws),
command: Some("/bin/sh".into()),
args: vec!["-c".into(), "printf REPAINT_ME; sleep 2".into()],
cols: 80, rows: 24,
}).await;
surface_id = spacesh_proto::SurfaceId(res_data(&r)["surface_id"].as_str().unwrap().to_string());
// Give the actor time to flush output into the grid.
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
// disconnect by dropping `s`
}
// Second client: attach to the same surface, expect snapshot to contain the marker.
// Re-verify the socket is still up before connecting (handles any scheduling jitter).
wait_for_socket(&sock).await;
let mut s2 = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut s2, 1, Cmd::Attach { surface_id: surface_id.clone() }).await;
let snap = res_data(&r)["snapshot"].as_str().unwrap();
assert!(snap.contains("REPAINT_ME"), "snapshot was: {snap:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn apply_preset_builds_tree_and_status_reports_it() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let sock2 = sock.clone();
tokio::spawn(async move { let _ = serve(&sock2, store).await; });
wait_for_socket(&sock).await;
let mut s = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
let r = req(&mut s, 2, Cmd::ApplyPreset {
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
preset_id: "2x2".into(),
slots: vec![],
}).await;
let ids = res_data(&r)["surface_ids"].as_array().unwrap();
assert_eq!(ids.len(), 4);
let r = req(&mut s, 3, Cmd::Status).await;
let wss = res_data(&r)["workspaces"].as_array().unwrap();
let w0 = wss.iter().find(|w| w["id"] == ws).unwrap();
assert!(w0["layout"].is_object(), "layout tree present");
assert!(w0["layout"].to_string().contains("split"));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn set_state_updates_status_and_emits_event() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let sock2 = sock.clone();
tokio::spawn(async move { let _ = serve(&sock2, store).await; });
wait_for_socket(&sock).await;
let mut s = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
let r = req(&mut s, 2, Cmd::NewSurface {
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
command: Some("/bin/sh".into()),
args: vec!["-c".into(), "sleep 1".into()],
cols: 80, rows: 24,
}).await;
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
let surface_id = spacesh_proto::SurfaceId(sid.clone());
// set_state on the running surface
let r = req(&mut s, 3, Cmd::SetState { surface_id: surface_id.clone(), state: spacesh_proto::status::SurfaceState::Work }).await;
assert!(matches!(r, Envelope::Res { ok: true, .. }));
// status reflects it
let r = req(&mut s, 4, Cmd::Status).await;
let wss = res_data(&r)["workspaces"].as_array().unwrap();
let w0 = wss.iter().find(|w| w["id"] == ws).unwrap();
assert_eq!(w0["surfaces"][&sid]["state"], "work");
// unknown surface -> NOT_FOUND
let r = req(&mut s, 5, Cmd::SetState { surface_id: spacesh_proto::SurfaceId("s_nope".into()), state: spacesh_proto::status::SurfaceState::Done }).await;
match r { Envelope::Res { ok, error, .. } => { assert!(!ok); assert_eq!(error.unwrap().code, "NOT_FOUND"); }, _ => panic!() }
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cold_restart_restores_structure_stopped() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let state_path = dir.join("state.json");
let sock = dir.join("sock");
let ws;
{
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
let sock2 = sock.clone();
tokio::spawn(async move { let _ = serve(&sock2, store).await; });
wait_for_socket(&sock).await;
let mut s = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
req(&mut s, 2, Cmd::ApplyPreset {
workspace_id: spacesh_proto::WorkspaceId(ws.clone()), preset_id: "2tb".into(), slots: vec![],
}).await;
// allow debounce (500ms) to flush state.json
tokio::time::sleep(tokio::time::Duration::from_millis(900)).await;
}
// "cold start": new store on the same state file, new socket.
let sock_b = dir.join("sock2");
let store_b: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
let sb2 = sock_b.clone();
tokio::spawn(async move { let _ = serve(&sock_b, store_b).await; });
wait_for_socket(&sb2).await;
let mut s2 = UnixStream::connect(&sb2).await.unwrap();
let r = req(&mut s2, 1, Cmd::Status).await;
let wss = res_data(&r)["workspaces"].as_array().unwrap();
let w0 = wss.iter().find(|w| w["id"] == ws).expect("workspace restored");
let surfaces = w0["surfaces"].as_object().unwrap();
assert_eq!(surfaces.len(), 2, "2tb panels restored");
for (_id, sv) in surfaces {
assert_eq!(sv["running"], false, "restored panels are stopped");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn osc133_in_pty_sets_status_over_socket() {
let _serial = crate::test_support::serial();
let dir = tempdir_path();
let sock = dir.join("sock");
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
let sock2 = sock.clone();
tokio::spawn(async move { let _ = serve(&sock2, store).await; });
wait_for_socket(&sock).await;
let mut s = UnixStream::connect(&sock).await.unwrap();
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
let r = req(&mut s, 2, Cmd::NewSurface {
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
command: Some("/bin/sh".into()),
args: vec!["-c".into(), "printf '\\033]133;C\\007'; printf hi; printf '\\033]133;D;0\\007'; sleep 1".into()],
cols: 80, rows: 24,
}).await;
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
let surface_id = spacesh_proto::SurfaceId(sid.clone());
let _ = req(&mut s, 3, Cmd::Attach { surface_id }).await;
// Wait for a State event to flow (Work then Done).
let mut saw_done = false;
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
if let Ok(Ok(Some(Envelope::Evt(Evt::State { state, .. })))) =
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut s)).await {
if state == spacesh_proto::status::SurfaceState::Done { saw_done = true; break; }
}
}
assert!(saw_done, "expected a Done state event from OSC 133");
}
}