From 62a65b691d6adb6b5771ac0f4ee658508d6d02bb Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Tue, 9 Jun 2026 21:25:35 +0700 Subject: [PATCH] feat(daemon): M2 command dispatch, layout events, cold-start restore, persistence wiring Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/spaceshd/src/main.rs | 5 +- crates/spaceshd/src/server.rs | 446 +++++++++++++++++++++++++++------- 2 files changed, 367 insertions(+), 84 deletions(-) diff --git a/crates/spaceshd/src/main.rs b/crates/spaceshd/src/main.rs index ed7908f..23a9f24 100644 --- a/crates/spaceshd/src/main.rs +++ b/crates/spaceshd/src/main.rs @@ -50,6 +50,9 @@ async fn run_daemon() -> Result<()> { }; lifecycle::clear_stale_socket()?; let sock = lifecycle::socket_path()?; + let state_path = lifecycle::spacesh_dir()?.join("state.json"); + let store: std::sync::Arc = + std::sync::Arc::new(state_store::JsonStateStore::new(state_path)); eprintln!("spaceshd listening on {}", sock.display()); - server::serve(&sock).await + server::serve(&sock, store).await } diff --git a/crates/spaceshd/src/server.rs b/crates/spaceshd/src/server.rs index 5c82164..978a81c 100644 --- a/crates/spaceshd/src/server.rs +++ b/crates/spaceshd/src/server.rs @@ -1,14 +1,17 @@ use std::collections::HashMap; use std::path::Path; +use std::sync::Arc; +use std::time::Duration; use anyhow::Result; use base64::Engine; use spacesh_proto::codec::{read_frame, write_frame}; -use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId}; -use spacesh_pty::{PtyHandle, SpawnSpec}; +use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId, WorkspaceId}; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{mpsc, oneshot}; +use crate::persist::{self, Persister}; use crate::registry::Registry; -use crate::surface::{spawn_surface, SurfaceMsg}; +use crate::state_store::StateStore; +use crate::surface::{SurfaceMsg}; /// Per-client outbound channel: the router pushes envelopes the client task writes out. type ClientTx = mpsc::Sender; @@ -29,7 +32,7 @@ enum ServerMsg { type ClientId = u64; -pub async fn serve(socket: &Path) -> Result<()> { +pub async fn serve(socket: &Path, store: Arc) -> Result<()> { let listener = UnixListener::bind(socket)?; let (router_tx, router_rx) = mpsc::channel::(256); @@ -42,7 +45,9 @@ pub async fn serve(socket: &Path) -> Result<()> { } }); - let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx)); + let persister = persist::spawn(store.clone(), Duration::from_millis(500)); + let initial = store.load().unwrap_or_default(); + let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx, persister, initial)); let mut next_client: ClientId = 0; loop { @@ -97,8 +102,11 @@ async fn router( mut rx: mpsc::Receiver, router_tx: mpsc::Sender, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, + persister: Persister, + initial: crate::state_store::PersistState, ) { let mut reg = Registry::new(); + reg.restore(initial); let mut clients: HashMap = HashMap::new(); // surface_id → set of client ids subscribed (attached). let mut subs: HashMap> = HashMap::new(); @@ -125,11 +133,13 @@ async fn router( } } ServerMsg::Exit { surface_id, code } => { + // Transition running -> stopped; keep panel + tree. + reg.mark_stopped(&surface_id); let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code }); broadcast_evt(&clients, &evt); } ServerMsg::Request { id, cmd, client, out } => { - handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx).await; + handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx, &persister).await; } } } @@ -149,6 +159,15 @@ fn err(id: u64, code: &str, msg: &str) -> Envelope { error: Some(ErrorBody { code: code.into(), msg: msg.into() }) } } +/// Emit a `layout_changed` event for a workspace's current tree. +fn emit_layout(reg: &Registry, ws_id: &WorkspaceId, clients: &HashMap) { + if let Some(w) = reg.workspace(ws_id) { + broadcast_evt(clients, &Envelope::Evt(Evt::LayoutChanged { + workspace_id: ws_id.clone(), layout: w.layout.clone(), + })); + } +} + #[allow(clippy::too_many_arguments)] async fn handle_request( id: u64, @@ -160,116 +179,299 @@ async fn handle_request( clients: &HashMap, router_tx: &mpsc::Sender, exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>, + persister: &Persister, ) { + use spacesh_proto::message::{SplitDir, Edge}; + use spacesh_proto::layout::{LayoutNode, Orient}; + use spacesh_proto::workspace::SurfaceSpec; + match cmd { Cmd::Open { path } => { - let meta = reg.open_workspace(path.into()); - let _ = out.send(ok(id, serde_json::json!({ "workspace_id": meta.id.0 }))).await; + let (ws_id, created) = reg.open_workspace(path.into()); + if created { + if let Some(view) = reg.workspace_view(&ws_id) { + broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view })); + } + persister.mark_dirty(reg.persist_state()); + } + let _ = out.send(ok(id, serde_json::json!({ "workspace_id": ws_id.0 }))).await; } + Cmd::NewSurface { workspace_id, command, args, cols, rows } => { let Some(ws) = reg.workspace(&workspace_id).cloned() else { - let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; - return; + let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return; }; let sid = reg.new_surface_id(); - let shell = command.unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into())); - let spec = SpawnSpec { - command: shell, - args, - cwd: ws.path.clone(), - cols, - rows, - env: vec![("SPACESH_SURFACE_ID".into(), sid.0.clone())], + let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into())); + let spec = SurfaceSpec { + command: shell, args: args.clone(), cwd: ws.path.clone(), + agent_label: command, cols, rows, autostart: false, }; - match PtyHandle::spawn(spec) { - Ok(pty) => { - let handle = spawn_surface(sid.clone(), workspace_id.clone(), pty, cols, rows, exit_tx.clone()); - // Bridge the surface's broadcast into the router as Output messages. + match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, exit_tx.clone()) { + Ok(handle) => { spawn_output_bridge(sid.clone(), &handle, router_tx.clone()); - reg.insert_surface(handle); - let created = Envelope::Evt(Evt::SurfaceCreated { + reg.set_live(handle); + reg.add_surface_spec(&workspace_id, sid.clone(), spec); + // First panel of an empty workspace becomes the root leaf. + if let Some(w) = reg.workspace_mut(&workspace_id) { + if w.layout.is_none() { + w.layout = Some(LayoutNode::leaf(sid.clone())); + } + } + broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: sid.clone(), workspace_id: workspace_id.clone(), - }); - broadcast_evt(clients, &created); + })); + emit_layout(reg, &workspace_id, clients); + persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::json!({ "surface_id": sid.0 }))).await; } - Err(e) => { - let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; + Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; } + } + } + + Cmd::SplitSurface { surface_id, dir, command, args } => { + let Some(ws_id) = reg.workspace_of(&surface_id) else { + let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return; + }; + let ws = reg.workspace(&ws_id).cloned().unwrap(); + let new_sid = reg.new_surface_id(); + let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into())); + let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false }; + match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, exit_tx.clone()) { + Ok(handle) => { + spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone()); + reg.set_live(handle); + reg.add_surface_spec(&ws_id, new_sid.clone(), spec); + let orient = match dir { SplitDir::Right => Orient::H, SplitDir::Down => Orient::V }; + if let Some(w) = reg.workspace_mut(&ws_id) { + let mut root = w.layout.take().unwrap_or_else(|| LayoutNode::leaf(surface_id.clone())); + spacesh_core::ops::split_leaf(&mut root, &surface_id, orient, true, new_sid.clone()); + w.layout = Some(root); + } + broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: new_sid.clone(), workspace_id: ws_id.clone() })); + emit_layout(reg, &ws_id, clients); + persister.mark_dirty(reg.persist_state()); + let _ = out.send(ok(id, serde_json::json!({ "surface_id": new_sid.0 }))).await; + } + Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; } + } + } + + Cmd::SetRatios { workspace_id, node_path, ratios } => { + let ok_set = reg.workspace_mut(&workspace_id).map(|w| { + if let Some(l) = w.layout.as_mut() { + spacesh_core::ops::set_ratios(l, &node_path, &ratios) + } else { false } + }).unwrap_or(false); + if ok_set { + emit_layout(reg, &workspace_id, clients); + persister.mark_dirty(reg.persist_state()); + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } else { + let _ = out.send(err(id, "BAD_REQUEST", "invalid node_path or ratios")).await; + } + } + + Cmd::MoveSurface { surface_id, target_surface_id, edge } => { + let Some(ws_id) = reg.workspace_of(&surface_id) else { + let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return; + }; + if let Some(w) = reg.workspace_mut(&ws_id) { + if let Some(root) = w.layout.take() { + w.layout = Some(spacesh_core::ops::move_leaf(root, &surface_id, &target_surface_id, edge)); } } + emit_layout(reg, &ws_id, clients); + persister.mark_dirty(reg.persist_state()); + let _ = out.send(ok(id, serde_json::Value::Null)).await; } + + Cmd::ApplyPreset { workspace_id, preset_id, slots } => { + let Some(count) = spacesh_core::presets::slot_count(&preset_id) else { + let _ = out.send(err(id, "BAD_REQUEST", "unknown preset")).await; return; + }; + let Some(ws) = reg.workspace(&workspace_id).cloned() else { + let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return; + }; + // Kill current panels of this workspace. + let existing: Vec = ws.surfaces.keys().cloned().collect(); + for sid in &existing { + if let Some(h) = reg.live(sid) { let _ = h.tx.send(crate::surface::SurfaceMsg::Close).await; } + reg.remove_surface(sid); + subs.remove(sid); + } + // Spawn `count` panels (slots padded/truncated to count). + let mut new_ids = Vec::new(); + for i in 0..count { + let slot = slots.get(i); + let new_sid = reg.new_surface_id(); + let command = slot.and_then(|s| s.command.clone()); + let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into())); + let args = slot.map(|s| s.args.clone()).unwrap_or_default(); + let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false }; + match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, exit_tx.clone()) { + Ok(handle) => { + spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone()); + reg.set_live(handle); + reg.add_surface_spec(&workspace_id, new_sid.clone(), spec); + new_ids.push(new_sid); + } + Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; return; } + } + } + if let Some(tree) = spacesh_core::presets::build(&preset_id, &new_ids) { + if let Some(w) = reg.workspace_mut(&workspace_id) { w.layout = Some(tree); } + } + for sid in &new_ids { + broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: sid.clone(), workspace_id: workspace_id.clone() })); + } + emit_layout(reg, &workspace_id, clients); + persister.mark_dirty(reg.persist_state()); + let _ = out.send(ok(id, serde_json::json!({ "surface_ids": new_ids.iter().map(|s| s.0.clone()).collect::>() }))).await; + } + + Cmd::RestartSurface { surface_id } => { + if reg.is_running(&surface_id) { + let _ = out.send(ok(id, serde_json::Value::Null)).await; return; // already running + } + let Some(spec) = reg.surface_spec(&surface_id) else { + let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return; + }; + let ws_id = reg.workspace_of(&surface_id).unwrap(); + match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, exit_tx.clone()) { + Ok(handle) => { + spawn_output_bridge(surface_id.clone(), &handle, router_tx.clone()); + reg.set_live(handle); + broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceRestarted { surface_id: surface_id.clone() })); + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } + Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; } + } + } + + Cmd::CloseWorkspace { workspace_id } => { + let ids = reg.close_workspace(&workspace_id); + for sid in &ids { subs.remove(sid); } + broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceClosed { workspace_id: workspace_id.clone() })); + persister.mark_dirty(reg.persist_state()); + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } + + Cmd::SetWorkspaceMeta { workspace_id, name, group_id, unread, order } => { + let found = reg.workspace_mut(&workspace_id).map(|w| { + if let Some(n) = name { w.name = n; } + if let Some(g) = group_id { w.group_id = g; } + if let Some(u) = unread { w.unread = u; } + if let Some(o) = order { w.order = o; } + }).is_some(); + if found { + if let Some(view) = reg.workspace_view(&workspace_id) { + broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view })); + } + persister.mark_dirty(reg.persist_state()); + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } else { + let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; + } + } + + Cmd::CreateGroup { name, color } => { + let gid = reg.create_group(name, color); + broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() })); + persister.mark_dirty(reg.persist_state()); + let _ = out.send(ok(id, serde_json::json!({ "group_id": gid.0 }))).await; + } + + Cmd::SetGroup { group_id, name, color, order } => { + let found = reg.group_mut(&group_id).map(|g| { + if let Some(n) = name { g.name = n; } + if let Some(c) = color { g.color = c; } + if let Some(o) = order { g.order = o; } + }).is_some(); + if found { + broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() })); + persister.mark_dirty(reg.persist_state()); + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } else { + let _ = out.send(err(id, "NOT_FOUND", "group")).await; + } + } + + Cmd::DeleteGroup { group_id } => { + reg.delete_group(&group_id); + broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() })); + persister.mark_dirty(reg.persist_state()); + let _ = out.send(ok(id, serde_json::Value::Null)).await; + } + Cmd::Input { surface_id, bytes } => { let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&bytes) else { - let _ = out.send(err(id, "BAD_REQUEST", "invalid base64")).await; - return; + let _ = out.send(err(id, "BAD_REQUEST", "invalid base64")).await; return; }; - if let Some(s) = reg.surface(&surface_id) { - let _ = s.tx.send(SurfaceMsg::Input(decoded)).await; + if let Some(s) = reg.live(&surface_id) { + let _ = s.tx.send(crate::surface::SurfaceMsg::Input(decoded)).await; let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } } + Cmd::Resize { surface_id, cols, rows } => { - if let Some(s) = reg.surface(&surface_id) { - let _ = s.tx.send(SurfaceMsg::Resize { cols, rows }).await; + if let Some(s) = reg.live(&surface_id) { + let _ = s.tx.send(crate::surface::SurfaceMsg::Resize { cols, rows }).await; let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } } + Cmd::Attach { surface_id } => { - if let Some(s) = reg.surface(&surface_id) { + if let Some(s) = reg.live(&surface_id) { let (reply_tx, reply_rx) = oneshot::channel(); if s.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.is_ok() { if let Ok((snap, _sub)) = reply_rx.await { subs.entry(surface_id.clone()).or_default().push(client); let _ = out.send(ok(id, serde_json::json!({ - "snapshot": snap.ansi, - "cols": snap.cols, - "rows": snap.rows, - "cursor_row": snap.cursor_row, - "cursor_col": snap.cursor_col, + "snapshot": snap.ansi, "cols": snap.cols, "rows": snap.rows, + "cursor_row": snap.cursor_row, "cursor_col": snap.cursor_col, }))).await; return; } } let _ = out.send(err(id, "INTERNAL", "attach failed")).await; } else { - let _ = out.send(err(id, "NOT_FOUND", "surface")).await; + // stopped panel: no live stream, return an empty snapshot so the GUI shows the restart overlay. + let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0, "stopped": true }))).await; } } + Cmd::Detach { surface_id } => { - if let Some(list) = subs.get_mut(&surface_id) { - list.retain(|c| *c != client); - } - let _ = out.send(ok(id, serde_json::Value::Null)).await; - } - Cmd::Focus { surface_id: _ } => { - // Focus is a no-op in this slice (window raise is GUI-side; CLI parity later). + if let Some(list) = subs.get_mut(&surface_id) { list.retain(|c| *c != client); } let _ = out.send(ok(id, serde_json::Value::Null)).await; } + + Cmd::Focus { surface_id: _ } => { let _ = out.send(ok(id, serde_json::Value::Null)).await; } + Cmd::Close { surface_id } => { - if let Some(handle) = reg.remove_surface(&surface_id) { - let _ = handle.tx.send(SurfaceMsg::Close).await; + if reg.surface_spec(&surface_id).is_some() { + if let Some(h) = reg.live(&surface_id) { let _ = h.tx.send(crate::surface::SurfaceMsg::Close).await; } + let ws_id = reg.workspace_of(&surface_id); + reg.remove_surface(&surface_id); subs.remove(&surface_id); - let closed = Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() }); - broadcast_evt(clients, &closed); + broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() })); + if let Some(ws_id) = ws_id { emit_layout(reg, &ws_id, clients); } + persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; } else { let _ = out.send(err(id, "NOT_FOUND", "surface")).await; } } + Cmd::Status => { - let workspaces: Vec<_> = reg.status().into_iter().map(|(w, sids)| { - serde_json::json!({ - "workspace_id": w.id.0, - "path": w.path.to_string_lossy(), - "surfaces": sids.iter().map(|s| s.0.clone()).collect::>(), - }) - }).collect(); - let _ = out.send(ok(id, serde_json::json!({ "workspaces": workspaces }))).await; + let (groups, workspaces) = reg.status(); + let _ = out.send(ok(id, serde_json::json!({ "groups": groups, "workspaces": workspaces }))).await; } + Cmd::Shutdown => { let _ = out.send(ok(id, serde_json::Value::Null)).await; std::process::exit(0); @@ -321,13 +523,36 @@ mod tests { } } + fn res_data(env: &Envelope) -> &serde_json::Value { + match env { Envelope::Res { data, .. } => data, _ => panic!("not a res") } + } + + fn tempdir_path() -> std::path::PathBuf { + let mut p = std::env::temp_dir(); + let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos(); + p.push(format!("spaceshd-test-{n}")); + std::fs::create_dir_all(&p).unwrap(); + p + } + + async fn wait_for_socket(sock: &Path) { + for _ in 0..300 { + if UnixStream::connect(sock).await.is_ok() { return; } + tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + } + panic!("socket never came up"); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn open_new_surface_attach_streams_output() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let sock_for_task = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task).await; }); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); @@ -364,8 +589,11 @@ mod tests { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let sock_for_task = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task).await; }); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; }); wait_for_socket(&sock).await; let mut s = UnixStream::connect(&sock).await.unwrap(); let r = req(&mut s, 1, Cmd::Input { @@ -381,33 +609,16 @@ mod tests { } } - fn res_data(env: &Envelope) -> &serde_json::Value { - match env { Envelope::Res { data, .. } => data, _ => panic!("not a res") } - } - - fn tempdir_path() -> std::path::PathBuf { - let mut p = std::env::temp_dir(); - let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos(); - p.push(format!("spaceshd-test-{n}")); - std::fs::create_dir_all(&p).unwrap(); - p - } - - async fn wait_for_socket(sock: &Path) { - for _ in 0..300 { - if UnixStream::connect(sock).await.is_ok() { return; } - tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; - } - panic!("socket never came up"); - } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn reattach_returns_snapshot_with_prior_output() { let _serial = crate::test_support::serial(); let dir = tempdir_path(); let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); let sock_for_task = sock.clone(); - tokio::spawn(async move { let _ = serve(&sock_for_task).await; }); + let store2 = store.clone(); + tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; }); wait_for_socket(&sock).await; // First client: open, new surface that prints a marker, attach, then disconnect. @@ -436,4 +647,73 @@ mod tests { let snap = res_data(&r)["snapshot"].as_str().unwrap(); assert!(snap.contains("REPAINT_ME"), "snapshot was: {snap:?}"); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn apply_preset_builds_tree_and_status_reports_it() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let sock2 = sock.clone(); + tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + wait_for_socket(&sock).await; + let mut s = UnixStream::connect(&sock).await.unwrap(); + + let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut s, 2, Cmd::ApplyPreset { + workspace_id: spacesh_proto::WorkspaceId(ws.clone()), + preset_id: "2x2".into(), + slots: vec![], + }).await; + let ids = res_data(&r)["surface_ids"].as_array().unwrap(); + assert_eq!(ids.len(), 4); + + let r = req(&mut s, 3, Cmd::Status).await; + let wss = res_data(&r)["workspaces"].as_array().unwrap(); + let w0 = wss.iter().find(|w| w["id"] == ws).unwrap(); + assert!(w0["layout"].is_object(), "layout tree present"); + assert!(w0["layout"].to_string().contains("split")); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn cold_restart_restores_structure_stopped() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let state_path = dir.join("state.json"); + let sock = dir.join("sock"); + let ws; + { + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); + let sock2 = sock.clone(); + tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + wait_for_socket(&sock).await; + let mut s = UnixStream::connect(&sock).await.unwrap(); + let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + req(&mut s, 2, Cmd::ApplyPreset { + workspace_id: spacesh_proto::WorkspaceId(ws.clone()), preset_id: "2tb".into(), slots: vec![], + }).await; + // allow debounce (500ms) to flush state.json + tokio::time::sleep(tokio::time::Duration::from_millis(900)).await; + } + // "cold start": new store on the same state file, new socket. + let sock_b = dir.join("sock2"); + let store_b: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone())); + let sb2 = sock_b.clone(); + tokio::spawn(async move { let _ = serve(&sock_b, store_b).await; }); + wait_for_socket(&sb2).await; + let mut s2 = UnixStream::connect(&sb2).await.unwrap(); + let r = req(&mut s2, 1, Cmd::Status).await; + let wss = res_data(&r)["workspaces"].as_array().unwrap(); + let w0 = wss.iter().find(|w| w["id"] == ws).expect("workspace restored"); + let surfaces = w0["surfaces"].as_object().unwrap(); + assert_eq!(surfaces.len(), 2, "2tb panels restored"); + for (_id, sv) in surfaces { + assert_eq!(sv["running"], false, "restored panels are stopped"); + } + } }