use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use base64::Engine; use spacesh_proto::codec::{read_frame, write_frame}; use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId, WorkspaceId}; use spacesh_proto::status::SurfaceState; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{mpsc, oneshot}; use crate::event_log::EventLog; use crate::event_store::{self, EventPersister, EventStore}; use crate::persist::{self, Persister}; use crate::registry::Registry; use crate::state_store::StateStore; use crate::surface::{SurfaceMsg}; /// Per-client outbound channel: the router pushes envelopes the client task writes out. type ClientTx = mpsc::Sender; /// Messages into the single router task. enum ServerMsg { /// A request from a client; reply routed to that client's `out`. Request { id: u64, cmd: Cmd, client: ClientId, out: ClientTx }, /// Forward an output chunk to all subscribers of `surface_id`. Output { surface_id: SurfaceId, bytes: Vec }, /// A surface process exited. Exit { surface_id: SurfaceId, code: i32 }, /// Register a new client's event sink. ClientConnected { client: ClientId, out: ClientTx }, /// Drop a client and all its subscriptions. ClientDisconnected { client: ClientId }, /// A status change detected internally (OSC 133 / fallback) by a surface actor. StateDetected { surface_id: SurfaceId, state: SurfaceState }, } type ClientId = u64; pub async fn serve(socket: &Path, store: Arc, event_store: Arc) -> Result<()> { let listener = UnixListener::bind(socket)?; let (router_tx, router_rx) = mpsc::channel::(256); // Exit events from surfaces are funneled into the router. let (exit_tx, mut exit_rx) = mpsc::unbounded_channel::<(SurfaceId, i32)>(); let router_for_exit = router_tx.clone(); tokio::spawn(async move { while let Some((sid, code)) = exit_rx.recv().await { let _ = router_for_exit.send(ServerMsg::Exit { surface_id: sid, code }).await; } }); let (state_tx, mut state_rx) = mpsc::unbounded_channel::<(SurfaceId, SurfaceState)>(); let router_for_state = router_tx.clone(); tokio::spawn(async move { while let Some((sid, st)) = state_rx.recv().await { let _ = router_for_state.send(ServerMsg::StateDetected { surface_id: sid, state: st }).await; } }); let persister = persist::spawn(store.clone(), Duration::from_millis(500)); let initial = store.load().unwrap_or_default(); let event_persister = event_store::spawn(event_store.clone(), Duration::from_millis(500)); let event_initial = event_store.load().unwrap_or_default(); let started_at_ms = now_millis(); let shutdown = tokio::spawn(router( router_rx, router_tx.clone(), exit_tx, state_tx, persister, initial, event_persister, event_initial, started_at_ms, )); let mut next_client: ClientId = 0; loop { let (stream, _addr) = listener.accept().await?; let client_id = next_client; next_client += 1; let router_tx = router_tx.clone(); tokio::spawn(handle_client(stream, client_id, router_tx)); if shutdown.is_finished() { break; } } Ok(()) } async fn handle_client(stream: UnixStream, client_id: ClientId, router_tx: mpsc::Sender) { let (mut read_half, mut write_half) = stream.into_split(); let (out_tx, mut out_rx) = mpsc::channel::(256); let _ = router_tx .send(ServerMsg::ClientConnected { client: client_id, out: out_tx.clone() }) .await; // Writer task: drain outbound envelopes to the socket. let writer = tokio::spawn(async move { while let Some(env) = out_rx.recv().await { if write_frame(&mut write_half, &env).await.is_err() { break; } } }); // Reader loop: parse frames and forward requests to the router. loop { match read_frame(&mut read_half).await { Ok(Some(Envelope::Req { id, cmd })) => { let _ = router_tx .send(ServerMsg::Request { id, cmd, client: client_id, out: out_tx.clone() }) .await; } Ok(Some(_)) => { /* clients don't send res/evt; ignore */ } Ok(None) => break, // EOF Err(_) => break, } } let _ = router_tx.send(ServerMsg::ClientDisconnected { client: client_id }).await; writer.abort(); } async fn router( mut rx: mpsc::Receiver, router_tx: mpsc::Sender, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, persister: Persister, initial: crate::state_store::PersistState, event_persister: EventPersister, event_initial: crate::event_log::EventLogState, started_at_ms: u64, ) { let mut reg = Registry::new(); reg.restore(initial); let mut event_log = EventLog::restore(event_initial, 1000); let mut clients: HashMap = HashMap::new(); // surface_id → set of client ids subscribed (attached). let mut subs: HashMap> = HashMap::new(); let mut config = crate::config::Config::load(); while let Some(msg) = rx.recv().await { match msg { ServerMsg::ClientConnected { client, out } => { clients.insert(client, out); } ServerMsg::ClientDisconnected { client } => { clients.remove(&client); for list in subs.values_mut() { list.retain(|c| *c != client); } } ServerMsg::Output { surface_id, bytes } => { if let Some(list) = subs.get(&surface_id) { let evt = Envelope::Evt(Evt::Output { surface_id: surface_id.clone(), bytes }); for c in list { if let Some(out) = clients.get(c) { let _ = out.try_send(evt.clone()); } } } } ServerMsg::Exit { surface_id, code } => { reg.mark_stopped(&surface_id); reg.drop_state(&surface_id); record_event(®, &mut event_log, &event_persister, &clients, &surface_id, spacesh_proto::event::EventKind::Exit); let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code }); broadcast_evt(&clients, &evt); } ServerMsg::StateDetected { surface_id, state } => { if reg.is_running(&surface_id) { reg.set_state(&surface_id, state); broadcast_evt(&clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state })); if let Some(kind) = kind_for_state(state) { record_event(®, &mut event_log, &event_persister, &clients, &surface_id, kind); } } } ServerMsg::Request { id, cmd, client, out } => { handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx, &state_tx, &persister, &mut event_log, &event_persister, started_at_ms, &mut config).await; } } } } fn broadcast_evt(clients: &HashMap, evt: &Envelope) { for out in clients.values() { let _ = out.try_send(evt.clone()); } } /// Current unix-epoch milliseconds. `as u64` is safe — epoch millis fit u64 for ~584M years. fn now_millis() -> u64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_millis() as u64) .unwrap_or(0) } /// Which state transitions are worth logging. work/idle are noise → None. fn kind_for_state(state: SurfaceState) -> Option { use spacesh_proto::event::EventKind; match state { SurfaceState::Done => Some(EventKind::Done), SurfaceState::Wait => Some(EventKind::Wait), SurfaceState::Error => Some(EventKind::Error), SurfaceState::Work | SurfaceState::Idle => None, } } /// Record one event (denormalizing workspace name + agent label), persist, broadcast. fn record_event( reg: &Registry, log: &mut EventLog, persister: &EventPersister, clients: &HashMap, sid: &SurfaceId, kind: spacesh_proto::event::EventKind, ) { // No workspace → the surface was already removed (user-initiated Close / ApplyPreset / // CloseWorkspace remove it synchronously before the async Exit arrives). Such deliberate // closes are intentionally NOT logged — only spontaneous process exits and status // transitions become events. let Some(ws_id) = reg.workspace_of(sid) else { return }; let ws_name = reg.workspace(&ws_id).map(|w| w.name.clone()).unwrap_or_default(); let agent = reg.surface_spec(sid).and_then(|s| s.agent_label); let rec = log.record(sid.clone(), ws_id, ws_name, agent, kind, now_millis()); persister.mark_dirty(log.snapshot()); broadcast_evt(clients, &Envelope::Evt(Evt::Event { record: rec })); } fn ok(id: u64, data: serde_json::Value) -> Envelope { Envelope::Res { id, ok: true, data, error: None } } fn err(id: u64, code: &str, msg: &str) -> Envelope { Envelope::Res { id, ok: false, data: serde_json::Value::Null, error: Some(ErrorBody { code: code.into(), msg: msg.into() }) } } /// Compute spawn env (hooks for claude agents, zsh integration for zsh shells) /// and whether a deterministic hook source is active. fn spawn_env(sid: &SurfaceId, spec: &spacesh_proto::workspace::SurfaceSpec) -> (Vec<(String, String)>, bool) { let (mut env, active) = if crate::hooks::is_agent(&spec.command, spec.agent_label.as_deref()) { let env = crate::hooks::prepare(sid, &crate::hooks::spacesh_bin()); let active = !env.is_empty(); (env, active) } else if crate::hooks::is_zsh(&spec.command) { (crate::hooks::shell_env(sid), false) } else { (vec![], false) }; // Ensure the child sees the user's full PATH; the GUI/launchd-launched daemon // otherwise can't find agents (claude/codex/gemini) and the panel exits at once. if !env.iter().any(|(k, _)| k == "PATH") { env.push(("PATH".to_string(), crate::config::enriched_path())); } (env, active) } /// Emit a `layout_changed` event for a workspace's current tree. fn emit_layout(reg: &Registry, ws_id: &WorkspaceId, clients: &HashMap) { if let Some(w) = reg.workspace(ws_id) { broadcast_evt(clients, &Envelope::Evt(Evt::LayoutChanged { workspace_id: ws_id.clone(), layout: w.layout.clone(), })); } } #[allow(clippy::too_many_arguments)] async fn handle_request( id: u64, cmd: Cmd, client: ClientId, out: ClientTx, reg: &mut Registry, subs: &mut HashMap>, clients: &HashMap, router_tx: &mpsc::Sender, exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>, state_tx: &mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, persister: &Persister, event_log: &mut EventLog, event_persister: &EventPersister, started_at_ms: u64, config: &mut crate::config::Config, ) { use spacesh_proto::message::SplitDir; use spacesh_proto::layout::{LayoutNode, Orient}; use spacesh_proto::workspace::SurfaceSpec; match cmd { Cmd::Open { path } => { let (ws_id, created) = reg.open_workspace(path.into()); if created { if let Some(view) = reg.workspace_view(&ws_id) { broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view })); } persister.mark_dirty(reg.persist_state()); } let _ = out.send(ok(id, serde_json::json!({ "workspace_id": ws_id.0 }))).await; } Cmd::NewSurface { workspace_id, command, args, cols, rows } => { let Some(ws) = reg.workspace(&workspace_id).cloned() else { let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return; }; let sid = reg.new_surface_id(); let shell = command.clone().unwrap_or_else(|| config.resolved_shell()); let spec = SurfaceSpec { command: shell, args: args.clone(), cwd: ws.path.clone(), agent_label: command, cols, rows, autostart: false, }; let (env, hooks_active) = spawn_env(&sid, &spec); match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { Ok(handle) => { spawn_output_bridge(sid.clone(), &handle, router_tx.clone()); reg.set_live(handle); reg.set_state(&sid, spacesh_proto::SurfaceState::Idle); reg.add_surface_spec(&workspace_id, sid.clone(), spec); // First panel of an empty workspace becomes the root leaf. if let Some(w) = reg.workspace_mut(&workspace_id) { if w.layout.is_none() { w.layout = Some(LayoutNode::leaf(sid.clone())); } } broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: sid.clone(), workspace_id: workspace_id.clone(), })); emit_layout(reg, &workspace_id, clients); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::json!({ "surface_id": sid.0 }))).await; } Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; } } } Cmd::SplitSurface { surface_id, dir, command, args } => { let Some(ws_id) = reg.workspace_of(&surface_id) else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return; }; let ws = reg.workspace(&ws_id).cloned().unwrap(); let new_sid = reg.new_surface_id(); let shell = command.clone().unwrap_or_else(|| config.resolved_shell()); let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false }; let (env, hooks_active) = spawn_env(&new_sid, &spec); match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { Ok(handle) => { spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone()); reg.set_live(handle); reg.set_state(&new_sid, spacesh_proto::SurfaceState::Idle); reg.add_surface_spec(&ws_id, new_sid.clone(), spec); let orient = match dir { SplitDir::Right => Orient::H, SplitDir::Down => Orient::V }; if let Some(w) = reg.workspace_mut(&ws_id) { let mut root = w.layout.take().unwrap_or_else(|| LayoutNode::leaf(surface_id.clone())); spacesh_core::ops::split_leaf(&mut root, &surface_id, orient, true, new_sid.clone()); w.layout = Some(root); } broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: new_sid.clone(), workspace_id: ws_id.clone() })); emit_layout(reg, &ws_id, clients); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::json!({ "surface_id": new_sid.0 }))).await; } Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; } } } Cmd::SetRatios { workspace_id, node_path, ratios } => { let ok_set = reg.workspace_mut(&workspace_id).map(|w| { if let Some(l) = w.layout.as_mut() { spacesh_core::ops::set_ratios(l, &node_path, &ratios) } else { false } }).unwrap_or(false); if ok_set { emit_layout(reg, &workspace_id, clients); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "BAD_REQUEST", "invalid node_path or ratios")).await; } } Cmd::MoveSurface { surface_id, target_surface_id, edge } => { let Some(ws_id) = reg.workspace_of(&surface_id) else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return; }; if let Some(w) = reg.workspace_mut(&ws_id) { if let Some(root) = w.layout.take() { w.layout = Some(spacesh_core::ops::move_leaf(root, &surface_id, &target_surface_id, edge)); } } emit_layout(reg, &ws_id, clients); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::ApplyPreset { workspace_id, preset_id, slots } => { let Some(count) = spacesh_core::presets::slot_count(&preset_id) else { let _ = out.send(err(id, "BAD_REQUEST", "unknown preset")).await; return; }; let Some(ws) = reg.workspace(&workspace_id).cloned() else { let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return; }; // Kill current panels of this workspace. let existing: Vec = ws.surfaces.keys().cloned().collect(); for sid in &existing { if let Some(h) = reg.live(sid) { let _ = h.tx.send(crate::surface::SurfaceMsg::Close).await; } reg.remove_surface(sid); subs.remove(sid); } // Spawn `count` panels (slots padded/truncated to count). let mut new_ids = Vec::new(); for i in 0..count { let slot = slots.get(i); let new_sid = reg.new_surface_id(); let command = slot.and_then(|s| s.command.clone()); let shell = command.clone().unwrap_or_else(|| config.resolved_shell()); let args = slot.map(|s| s.args.clone()).unwrap_or_default(); let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false }; let (env, hooks_active) = spawn_env(&new_sid, &spec); match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { Ok(handle) => { spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone()); reg.set_live(handle); reg.set_state(&new_sid, spacesh_proto::SurfaceState::Idle); reg.add_surface_spec(&workspace_id, new_sid.clone(), spec); new_ids.push(new_sid); } Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; return; } } } if let Some(tree) = spacesh_core::presets::build(&preset_id, &new_ids) { if let Some(w) = reg.workspace_mut(&workspace_id) { w.layout = Some(tree); } } for sid in &new_ids { broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: sid.clone(), workspace_id: workspace_id.clone() })); } emit_layout(reg, &workspace_id, clients); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::json!({ "surface_ids": new_ids.iter().map(|s| s.0.clone()).collect::>() }))).await; } Cmd::RestartSurface { surface_id } => { if reg.is_running(&surface_id) { let _ = out.send(ok(id, serde_json::Value::Null)).await; return; // already running } let Some(spec) = reg.surface_spec(&surface_id) else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return; }; let ws_id = reg.workspace_of(&surface_id).unwrap(); let (env, hooks_active) = spawn_env(&surface_id, &spec); match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { Ok(handle) => { spawn_output_bridge(surface_id.clone(), &handle, router_tx.clone()); reg.set_live(handle); reg.set_state(&surface_id, spacesh_proto::SurfaceState::Idle); broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceRestarted { surface_id: surface_id.clone() })); let _ = out.send(ok(id, serde_json::Value::Null)).await; } Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; } } } Cmd::CloseWorkspace { workspace_id } => { let ids = reg.close_workspace(&workspace_id); for sid in &ids { crate::hooks::cleanup(sid); crate::hooks::cleanup_shell(sid); subs.remove(sid); } broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceClosed { workspace_id: workspace_id.clone() })); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::SetWorkspaceMeta { workspace_id, name, group_id, unread, order, pinned } => { let found = reg.workspace_mut(&workspace_id).map(|w| { if let Some(n) = name { w.name = n; } if let Some(g) = group_id { w.group_id = g; } if let Some(u) = unread { w.unread = u; } if let Some(o) = order { w.order = o; } if let Some(p) = pinned { w.pinned = p; } }).is_some(); if found { if let Some(view) = reg.workspace_view(&workspace_id) { broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view })); } persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; } } Cmd::CreateGroup { name, color } => { let gid = reg.create_group(name, color); broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() })); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::json!({ "group_id": gid.0 }))).await; } Cmd::SetGroup { group_id, name, color, order } => { let found = reg.group_mut(&group_id).map(|g| { if let Some(n) = name { g.name = n; } if let Some(c) = color { g.color = c; } if let Some(o) = order { g.order = o; } }).is_some(); if found { broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() })); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "group")).await; } } Cmd::DeleteGroup { group_id } => { reg.delete_group(&group_id); broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() })); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::Input { surface_id, bytes } => { let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&bytes) else { let _ = out.send(err(id, "BAD_REQUEST", "invalid base64")).await; return; }; if let Some(s) = reg.live(&surface_id) { let _ = s.tx.send(crate::surface::SurfaceMsg::Input(decoded)).await; let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } } Cmd::Resize { surface_id, cols, rows } => { if let Some(s) = reg.live(&surface_id) { let _ = s.tx.send(crate::surface::SurfaceMsg::Resize { cols, rows }).await; let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } } Cmd::Attach { surface_id } => { if let Some(s) = reg.live(&surface_id) { let (reply_tx, reply_rx) = oneshot::channel(); if s.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.is_ok() { if let Ok((snap, _sub)) = reply_rx.await { subs.entry(surface_id.clone()).or_default().push(client); let _ = out.send(ok(id, serde_json::json!({ "snapshot": snap.ansi, "cols": snap.cols, "rows": snap.rows, "cursor_row": snap.cursor_row, "cursor_col": snap.cursor_col, }))).await; return; } } let _ = out.send(err(id, "INTERNAL", "attach failed")).await; } else { // stopped panel: no live stream, return an empty snapshot so the GUI shows the restart overlay. let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0, "stopped": true }))).await; } } Cmd::Detach { surface_id } => { if let Some(list) = subs.get_mut(&surface_id) { list.retain(|c| *c != client); } let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::Focus { surface_id } => { let ids = event_log.mark_read(&spacesh_proto::event::MarkReadTarget::Surface(surface_id)); if !ids.is_empty() { event_persister.mark_dirty(event_log.snapshot()); broadcast_evt(clients, &Envelope::Evt(Evt::EventsRead { ids })); } let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::Close { surface_id } => { if reg.surface_spec(&surface_id).is_some() { if let Some(h) = reg.live(&surface_id) { let _ = h.tx.send(crate::surface::SurfaceMsg::Close).await; } let ws_id = reg.workspace_of(&surface_id); reg.remove_surface(&surface_id); subs.remove(&surface_id); crate::hooks::cleanup(&surface_id); crate::hooks::cleanup_shell(&surface_id); broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() })); if let Some(ws_id) = ws_id { emit_layout(reg, &ws_id, clients); if let Some(view) = reg.workspace_view(&ws_id) { broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view })); } } persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } } Cmd::SetState { surface_id, state } => { if reg.is_running(&surface_id) { reg.set_state(&surface_id, state); broadcast_evt(clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state })); if let Some(kind) = kind_for_state(state) { record_event(reg, event_log, event_persister, clients, &surface_id, kind); } let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { // unknown or stopped surface — status is only meaningful while running. let _ = out.send(err(id, "NOT_FOUND", "surface not running")).await; } } Cmd::SetZoom { workspace_id, surface_id } => { let Some(w) = reg.workspace(&workspace_id) else { let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return; }; if let Some(sid) = &surface_id { if !w.surfaces.contains_key(sid) { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return; } } reg.workspace_mut(&workspace_id).expect("workspace validated above").zoomed = surface_id.clone(); if let Some(view) = reg.workspace_view(&workspace_id) { broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view })); } persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::Health => { let _ = out.send(ok(id, serde_json::json!({ "version": env!("CARGO_PKG_VERSION"), "build": option_env!("SPACESH_BUILD").unwrap_or("dev"), "pid": std::process::id(), "started_at_ms": started_at_ms, }))).await; } Cmd::WhichAgents { candidates } => { let available: Vec = candidates.into_iter().filter(|c| crate::config::is_installed(c)).collect(); let _ = out.send(ok(id, serde_json::json!({ "available": available }))).await; } Cmd::Status => { let (groups, workspaces) = reg.status(); let _ = out.send(ok(id, serde_json::json!({ "groups": groups, "workspaces": workspaces }))).await; } Cmd::EventLog { limit } => { let events = event_log.recent(limit); let unread = event_log.unread_count(); let _ = out.send(ok(id, serde_json::json!({ "events": events, "unread": unread }))).await; } Cmd::MarkRead { target } => { let ids = event_log.mark_read(&target); if !ids.is_empty() { event_persister.mark_dirty(event_log.snapshot()); broadcast_evt(clients, &Envelope::Evt(Evt::EventsRead { ids })); } let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::ClearEvents => { event_log.clear(); event_persister.mark_dirty(event_log.snapshot()); broadcast_evt(clients, &Envelope::Evt(Evt::EventsCleared)); let _ = out.send(ok(id, serde_json::Value::Null)).await; } Cmd::Shutdown => { let _ = out.send(ok(id, serde_json::Value::Null)).await; std::process::exit(0); } Cmd::GetConfig => { match serde_json::to_value(config.to_view()) { Ok(v) => { let _ = out.send(ok(id, v)).await; } Err(e) => { let _ = out.send(err(id, "INTERNAL", &e.to_string())).await; } } } Cmd::SetConfig { default_shell, font_family, font_size, theme, accent } => { if let Some(v) = &theme { if v != "dark" && v != "light" { let _ = out.send(err(id, "BAD_CONFIG", "theme")).await; return; } } if let Some(v) = &accent { const ACCENTS: [&str; 5] = ["blue", "teal", "purple", "green", "orange"]; if !ACCENTS.contains(&v.as_str()) { let _ = out.send(err(id, "BAD_CONFIG", "accent")).await; return; } } let mut next = config.clone(); if let Some(v) = default_shell { next.default_shell = if v.is_empty() { None } else { Some(v) }; } if let Some(v) = font_family { next.terminal.font_family = if v.is_empty() { None } else { Some(v) }; } if let Some(v) = font_size { next.terminal.font_size = Some(v.clamp(10, 20)); } if let Some(v) = theme { next.appearance.theme = Some(v); } if let Some(v) = accent { next.appearance.accent = Some(v); } let to_save = next.clone(); match tokio::task::spawn_blocking(move || to_save.save()).await { Ok(Ok(())) => { *config = next; let view = config.to_view(); broadcast_evt(clients, &Envelope::Evt(Evt::ConfigChanged { config: view })); let _ = out.send(ok(id, serde_json::Value::Null)).await; } Ok(Err(e)) => { let _ = out.send(err(id, "SAVE_FAILED", &e.to_string())).await; } Err(e) => { let _ = out.send(err(id, "SAVE_FAILED", &e.to_string())).await; } } } } } /// Pump a surface's broadcast output into the router as `ServerMsg::Output`. fn spawn_output_bridge( surface_id: SurfaceId, handle: &crate::surface::SurfaceHandle, router_tx: mpsc::Sender, ) { let tx = handle.tx.clone(); tokio::spawn(async move { // Ask the actor for a subscription receiver. let (reply_tx, reply_rx) = oneshot::channel(); if tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.is_err() { return; } let Ok(mut sub) = reply_rx.await else { return }; loop { match sub.recv().await { Ok(bytes) => { if router_tx.send(ServerMsg::Output { surface_id: surface_id.clone(), bytes }).await.is_err() { break; } } Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, Err(_) => break, // surface closed } } }); } #[cfg(test)] mod tests { use super::*; use base64::Engine; async fn req(stream: &mut UnixStream, id: u64, cmd: Cmd) -> Envelope { write_frame(stream, &Envelope::Req { id, cmd }).await.unwrap(); // Read until we see the matching res (skip interleaved evts). loop { let env = read_frame(stream).await.unwrap().unwrap(); if let Envelope::Res { id: rid, .. } = &env { if *rid == id { return env; } } } } fn res_data(env: &Envelope) -> &serde_json::Value { match env { Envelope::Res { data, .. } => data, _ => panic!("not a res") } } fn tempdir_path() -> std::path::PathBuf { let mut p = std::env::temp_dir(); let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos(); p.push(format!("spaceshd-test-{n}")); std::fs::create_dir_all(&p).unwrap(); p } /// Build an event store whose file lives inside the per-test temp dir so it is /// cleaned up with the rest of the test fixtures (not left in the global temp root). fn make_event_store(dir: &Path) -> std::sync::Arc { std::sync::Arc::new(crate::event_store::JsonEventStore::new(dir.join("events.json"))) } async fn wait_for_socket(sock: &Path) { for _ in 0..300 { if UnixStream::connect(sock).await.is_ok() { return; } tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; } panic!("socket never came up"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn open_new_surface_attach_streams_output() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws), command: Some("/bin/sh".into()), args: vec!["-c".into(), "printf STREAM_OK; sleep 0.5".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); let surface_id = spacesh_proto::SurfaceId(sid); let _ = req(&mut s, 3, Cmd::Attach { surface_id: surface_id.clone() }).await; // Now read frames looking for an Output evt containing STREAM_OK. let mut got = String::new(); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(6); while tokio::time::Instant::now() < deadline { if let Ok(Ok(Some(Envelope::Evt(Evt::Output { bytes, .. })))) = tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut s)).await { got.push_str(&String::from_utf8_lossy(&bytes)); if got.contains("STREAM_OK") { break; } } } assert!(got.contains("STREAM_OK"), "got: {got:?}"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unknown_surface_returns_not_found() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Input { surface_id: spacesh_proto::SurfaceId("s_nope".into()), bytes: base64::engine::general_purpose::STANDARD.encode(b"x"), }).await; match r { Envelope::Res { ok, error, .. } => { assert!(!ok); assert_eq!(error.unwrap().code, "NOT_FOUND"); } _ => panic!(), } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn reattach_returns_snapshot_with_prior_output() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; // First client: open, new surface that prints a marker, attach, then disconnect. let surface_id; { let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws), command: Some("/bin/sh".into()), args: vec!["-c".into(), "printf REPAINT_ME; sleep 2".into()], cols: 80, rows: 24, }).await; surface_id = spacesh_proto::SurfaceId(res_data(&r)["surface_id"].as_str().unwrap().to_string()); // Give the actor time to flush output into the grid. tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; // disconnect by dropping `s` } // Second client: attach to the same surface, expect snapshot to contain the marker. // Re-verify the socket is still up before connecting (handles any scheduling jitter). wait_for_socket(&sock).await; let mut s2 = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s2, 1, Cmd::Attach { surface_id: surface_id.clone() }).await; let snap = res_data(&r)["snapshot"].as_str().unwrap(); assert!(snap.contains("REPAINT_ME"), "snapshot was: {snap:?}"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn apply_preset_builds_tree_and_status_reports_it() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock2 = sock.clone(); tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::ApplyPreset { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), preset_id: "2x2".into(), slots: vec![], }).await; let ids = res_data(&r)["surface_ids"].as_array().unwrap(); assert_eq!(ids.len(), 4); let r = req(&mut s, 3, Cmd::Status).await; let wss = res_data(&r)["workspaces"].as_array().unwrap(); let w0 = wss.iter().find(|w| w["id"] == ws).unwrap(); assert!(w0["layout"].is_object(), "layout tree present"); assert!(w0["layout"].to_string().contains("split")); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn set_state_updates_status_and_emits_event() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock2 = sock.clone(); tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), command: Some("/bin/sh".into()), args: vec!["-c".into(), "sleep 1".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); let surface_id = spacesh_proto::SurfaceId(sid.clone()); // set_state on the running surface let r = req(&mut s, 3, Cmd::SetState { surface_id: surface_id.clone(), state: spacesh_proto::status::SurfaceState::Work }).await; assert!(matches!(r, Envelope::Res { ok: true, .. })); // status reflects it let r = req(&mut s, 4, Cmd::Status).await; let wss = res_data(&r)["workspaces"].as_array().unwrap(); let w0 = wss.iter().find(|w| w["id"] == ws).unwrap(); assert_eq!(w0["surfaces"][&sid]["state"], "work"); // unknown surface -> NOT_FOUND let r = req(&mut s, 5, Cmd::SetState { surface_id: spacesh_proto::SurfaceId("s_nope".into()), state: spacesh_proto::status::SurfaceState::Done }).await; match r { Envelope::Res { ok, error, .. } => { assert!(!ok); assert_eq!(error.unwrap().code, "NOT_FOUND"); }, _ => panic!() } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn cold_restart_restores_structure_stopped() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let state_path = dir.join("state.json"); let sock = dir.join("sock"); let ws; { let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); // Both daemon instances in this test share ONE event-store file under the // per-test dir so instance B reads from disk what instance A persisted. let event_store = make_event_store(&dir); let sock2 = sock.clone(); tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); req(&mut s, 2, Cmd::ApplyPreset { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), preset_id: "2tb".into(), slots: vec![], }).await; // allow debounce (500ms) to flush state.json tokio::time::sleep(tokio::time::Duration::from_millis(900)).await; } // "cold start": new store on the same state file, new socket. let sock_b = dir.join("sock2"); let store_b: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); let event_store_b = make_event_store(&dir); let sb2 = sock_b.clone(); tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b).await; }); wait_for_socket(&sb2).await; let mut s2 = UnixStream::connect(&sb2).await.unwrap(); let r = req(&mut s2, 1, Cmd::Status).await; let wss = res_data(&r)["workspaces"].as_array().unwrap(); let w0 = wss.iter().find(|w| w["id"] == ws).expect("workspace restored"); let surfaces = w0["surfaces"].as_object().unwrap(); assert_eq!(surfaces.len(), 2, "2tb panels restored"); for (_id, sv) in surfaces { assert_eq!(sv["running"], false, "restored panels are stopped"); } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn osc133_in_pty_sets_status_over_socket() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock2 = sock.clone(); tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), command: Some("/bin/sh".into()), args: vec!["-c".into(), "printf '\\033]133;C\\007'; printf hi; printf '\\033]133;D;0\\007'; sleep 1".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); let surface_id = spacesh_proto::SurfaceId(sid.clone()); let _ = req(&mut s, 3, Cmd::Attach { surface_id }).await; // Wait for a State event to flow (Work then Done). let mut saw_done = false; let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); while tokio::time::Instant::now() < deadline { if let Ok(Ok(Some(Envelope::Evt(Evt::State { state, .. })))) = tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut s)).await { if state == spacesh_proto::status::SurfaceState::Done { saw_done = true; break; } } } assert!(saw_done, "expected a Done state event from OSC 133"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn set_state_done_emits_event_record() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; // Control connection: open workspace and spawn surface. let mut ctrl = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut ctrl, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), command: Some("/bin/sh".into()), args: vec!["-c".into(), "sleep 5".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); // Observer connection: a second client that receives all broadcast events // without its read loop consuming them via req(). let mut observer = UnixStream::connect(&sock).await.unwrap(); // Trigger Done via SetState on the control connection. let _ = req(&mut ctrl, 3, Cmd::SetState { surface_id: spacesh_proto::SurfaceId(sid.clone()), state: spacesh_proto::status::SurfaceState::Done, }).await; // Expect an Evt::Event for this surface on the observer within a short window. let mut found = None; let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); while tokio::time::Instant::now() < deadline { if let Ok(Ok(Some(env))) = tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await { if let Envelope::Evt(Evt::Event { record }) = env { if record.surface_id.0 == sid { found = Some(record); break; } } } } let rec = found.expect("expected an Evt::Event for the surface"); assert_eq!(rec.kind, spacesh_proto::event::EventKind::Done); assert!(!rec.read); assert_eq!(rec.workspace_id.0, ws); assert!(!rec.workspace_name.is_empty(), "workspace name should be denormalized into the record"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn close_does_not_emit_event_record() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; // Control connection: open workspace and spawn surface. let mut ctrl = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut ctrl, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), command: Some("/bin/sh".into()), args: vec!["-c".into(), "sleep 5".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); // Observer connection: receives all broadcast events. let mut observer = UnixStream::connect(&sock).await.unwrap(); // User-initiated Close on the control connection. let _ = req(&mut ctrl, 3, Cmd::Close { surface_id: spacesh_proto::SurfaceId(sid.clone()), }).await; // A deliberate Close must surface an Evt::Exit but NEVER an Evt::Event for it. let mut saw_exit = false; let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2); while tokio::time::Instant::now() < deadline { if let Ok(Ok(Some(env))) = tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await { match env { Envelope::Evt(Evt::Event { record }) if record.surface_id.0 == sid => { panic!("user-initiated Close must not produce an Evt::Event"); } Envelope::Evt(Evt::Exit { surface_id, .. }) if surface_id.0 == sid => { saw_exit = true; } _ => {} } } } assert!(saw_exit, "expected an Evt::Exit for the closed surface"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn osc133_detected_state_emits_event_record() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock2 = sock.clone(); tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; // Control connection: open workspace and spawn a surface that emits OSC 133. let mut ctrl = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut ctrl, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), command: Some("/bin/sh".into()), args: vec!["-c".into(), "printf '\\033]133;C\\007'; printf hi; printf '\\033]133;D;0\\007'; sleep 1".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); // Observer connection: receives all broadcast events (the detected-state path // flows through ServerMsg::StateDetected → record_event → Evt::Event). let mut observer = UnixStream::connect(&sock).await.unwrap(); // Drive the PTY output by attaching the control connection. let _ = req(&mut ctrl, 3, Cmd::Attach { surface_id: spacesh_proto::SurfaceId(sid.clone()), }).await; // Expect an Evt::Event (kind=done) for this surface from the OSC 133 Done detection. let mut found = None; let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); while tokio::time::Instant::now() < deadline { if let Ok(Ok(Some(env))) = tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await { if let Envelope::Evt(Evt::Event { record }) = env { if record.surface_id.0 == sid { found = Some(record); break; } } } } let rec = found.expect("expected an Evt::Event from the OSC 133 detected state"); assert_eq!(rec.kind, spacesh_proto::event::EventKind::Done); assert!(!rec.read); assert_eq!(rec.workspace_id.0, ws); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn event_log_query_and_mark_read() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; // Observer connection to catch the EventsRead broadcast. let mut obs = UnixStream::connect(&sock).await.unwrap(); // Control connection. let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws), command: Some("/bin/sh".into()), args: vec!["-c".into(), "sleep 5".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); let _ = req(&mut s, 3, Cmd::SetState { surface_id: spacesh_proto::SurfaceId(sid.clone()), state: spacesh_proto::status::SurfaceState::Error, }).await; // Query the log. let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await; let data = res_data(&log); assert_eq!(data["unread"].as_u64().unwrap(), 1); let first_id = data["events"][0]["id"].as_u64().unwrap(); // Mark it read by id. let _ = req(&mut s, 5, Cmd::MarkRead { target: spacesh_proto::event::MarkReadTarget::Ids(vec![first_id]), }).await; // Observer should see EventsRead { ids: [first_id] }. let mut saw_read = false; let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); while tokio::time::Instant::now() < deadline { if let Ok(Ok(Some(env))) = tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut obs)).await { if let Envelope::Evt(Evt::EventsRead { ids }) = env { if ids == vec![first_id] { saw_read = true; break; } } } } assert!(saw_read, "expected an EventsRead broadcast for the marked id"); // Unread is now 0. let log = req(&mut s, 6, Cmd::EventLog { limit: None }).await; assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 0); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn event_log_persists_across_daemon_restart() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let state_path = dir.join("state.json"); let sock = dir.join("sock"); let event_id: u64; let ws_id: String; // ── Instance A ──────────────────────────────────────────────────────── { let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); let event_store = make_event_store(&dir); let sock2 = sock.clone(); tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); // Open workspace, spawn surface. let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; ws_id = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws_id.clone()), command: Some("/bin/sh".into()), args: vec!["-c".into(), "sleep 5".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); // Drive an Error state → one unread event is logged. let _ = req(&mut s, 3, Cmd::SetState { surface_id: spacesh_proto::SurfaceId(sid.clone()), state: spacesh_proto::status::SurfaceState::Error, }).await; // Query and assert unread == 1 before restart. let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await; let data = res_data(&log); assert_eq!(data["unread"].as_u64().unwrap(), 1, "instance A: expected 1 unread event"); assert_eq!(data["events"][0]["kind"].as_str().unwrap(), "error"); assert_eq!(data["events"][0]["workspace_id"].as_str().unwrap(), ws_id); event_id = data["events"][0]["id"].as_u64().unwrap(); // Wait comfortably longer than the 500 ms debounce so events.json is flushed. tokio::time::sleep(tokio::time::Duration::from_millis(900)).await; // Drop `s` (and instance A's task) by falling out of scope. } // ── Instance B (same dir, fresh socket path) ────────────────────────── let sock_b = dir.join("sock2"); let store_b: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); let event_store_b = make_event_store(&dir); let sb2 = sock_b.clone(); tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b).await; }); wait_for_socket(&sb2).await; let mut s2 = UnixStream::connect(&sb2).await.unwrap(); // Query event log on instance B — the persisted event must survive the restart. let log = req(&mut s2, 1, Cmd::EventLog { limit: None }).await; let data = res_data(&log); assert_eq!(data["unread"].as_u64().unwrap(), 1, "instance B: event log unread count must survive cold restart"); assert_eq!(data["events"][0]["id"].as_u64().unwrap(), event_id, "instance B: event id must match"); assert_eq!(data["events"][0]["kind"].as_str().unwrap(), "error", "instance B: event kind must be 'error'"); assert_eq!(data["events"][0]["workspace_id"].as_str().unwrap(), ws_id, "instance B: workspace_id must match"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn focus_marks_surface_events_read() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; // Observer connection. let mut obs = UnixStream::connect(&sock).await.unwrap(); // Control connection. let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws), command: Some("/bin/sh".into()), args: vec!["-c".into(), "sleep 5".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); // Drive an Error event for this surface. let _ = req(&mut s, 3, Cmd::SetState { surface_id: spacesh_proto::SurfaceId(sid.clone()), state: spacesh_proto::status::SurfaceState::Error, }).await; // Verify unread == 1 before Focus. let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await; assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 1); let first_id = res_data(&log)["events"][0]["id"].as_u64().unwrap(); // Focus that surface — should mark its events read. let _ = req(&mut s, 5, Cmd::Focus { surface_id: spacesh_proto::SurfaceId(sid.clone()), }).await; // Observer should receive EventsRead with the id from the Error event. let mut saw_read = false; let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); while tokio::time::Instant::now() < deadline { if let Ok(Ok(Some(env))) = tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut obs)).await { if let Envelope::Evt(Evt::EventsRead { ids }) = env { if ids.contains(&first_id) { saw_read = true; break; } } } } assert!(saw_read, "expected an EventsRead broadcast after Focus"); // Unread drops to 0. let log = req(&mut s, 6, Cmd::EventLog { limit: None }).await; assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 0); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn health_reports_version_pid_started() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64; let r = req(&mut s, 1, Cmd::Health).await; let d = res_data(&r); assert!(!d["version"].as_str().unwrap().is_empty()); assert!(d["pid"].as_u64().unwrap() > 0); let started = d["started_at_ms"].as_u64().unwrap(); assert!(started > 0 && started >= now.saturating_sub(5000) && started <= now + 1000, "started_at_ms plausible: {started} vs now {now}"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn set_zoom_sets_and_clears_and_autoclears() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut s, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), command: Some("/bin/sh".into()), args: vec!["-c".into(), "sleep 5".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); let _ = req(&mut s, 3, Cmd::SetZoom { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), surface_id: Some(spacesh_proto::SurfaceId(sid.clone())), }).await; let st = req(&mut s, 4, Cmd::Status).await; let w0 = res_data(&st)["workspaces"].as_array().unwrap().iter().find(|w| w["id"] == ws).unwrap().clone(); assert_eq!(w0["zoomed"], sid); let _ = req(&mut s, 5, Cmd::SetZoom { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), surface_id: None, }).await; let st = req(&mut s, 6, Cmd::Status).await; let w0 = res_data(&st)["workspaces"].as_array().unwrap().iter().find(|w| w["id"] == ws).unwrap().clone(); assert!(w0["zoomed"].is_null()); let _ = req(&mut s, 7, Cmd::SetZoom { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), surface_id: Some(spacesh_proto::SurfaceId(sid.clone())), }).await; let _ = req(&mut s, 8, Cmd::Close { surface_id: spacesh_proto::SurfaceId(sid.clone()) }).await; let st = req(&mut s, 9, Cmd::Status).await; let w0 = res_data(&st)["workspaces"].as_array().unwrap().iter().find(|w| w["id"] == ws).unwrap().clone(); assert!(w0["zoomed"].is_null(), "closing the zoomed surface clears zoom"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn close_zoomed_broadcasts_workspace_changed() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); let store: std::sync::Arc = std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let event_store = make_event_store(&dir); let sock_for_task = sock.clone(); let store2 = store.clone(); tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; }); wait_for_socket(&sock).await; // Control connection: open, spawn, zoom. let mut ctrl = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); let r = req(&mut ctrl, 2, Cmd::NewSurface { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), command: Some("/bin/sh".into()), args: vec!["-c".into(), "sleep 5".into()], cols: 80, rows: 24, }).await; let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); let _ = req(&mut ctrl, 3, Cmd::SetZoom { workspace_id: spacesh_proto::WorkspaceId(ws.clone()), surface_id: Some(spacesh_proto::SurfaceId(sid.clone())), }).await; // Observer connection: must be attached BEFORE the Close so it catches the broadcast. let mut observer = UnixStream::connect(&sock).await.unwrap(); // Close the zoomed surface on the control connection. let _ = req(&mut ctrl, 4, Cmd::Close { surface_id: spacesh_proto::SurfaceId(sid.clone()) }).await; // Observer must receive a WorkspaceChanged for this workspace with zoomed == None. let mut saw_cleared = false; let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2); while tokio::time::Instant::now() < deadline { if let Ok(Ok(Some(env))) = tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await { if let Envelope::Evt(Evt::WorkspaceChanged { workspace }) = env { if workspace.id.0 == ws { assert!(workspace.zoomed.is_none(), "WorkspaceChanged must report cleared zoom"); saw_cleared = true; break; } } } } assert!(saw_cleared, "expected a WorkspaceChanged broadcast with cleared zoom after closing the zoomed surface"); } }