feat(daemon): M2 command dispatch, layout events, cold-start restore, persistence wiring
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -50,6 +50,9 @@ async fn run_daemon() -> Result<()> {
|
|||||||
};
|
};
|
||||||
lifecycle::clear_stale_socket()?;
|
lifecycle::clear_stale_socket()?;
|
||||||
let sock = lifecycle::socket_path()?;
|
let sock = lifecycle::socket_path()?;
|
||||||
|
let state_path = lifecycle::spacesh_dir()?.join("state.json");
|
||||||
|
let store: std::sync::Arc<dyn state_store::StateStore> =
|
||||||
|
std::sync::Arc::new(state_store::JsonStateStore::new(state_path));
|
||||||
eprintln!("spaceshd listening on {}", sock.display());
|
eprintln!("spaceshd listening on {}", sock.display());
|
||||||
server::serve(&sock).await
|
server::serve(&sock, store).await
|
||||||
}
|
}
|
||||||
|
|||||||
+363
-83
@@ -1,14 +1,17 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
use spacesh_proto::codec::{read_frame, write_frame};
|
use spacesh_proto::codec::{read_frame, write_frame};
|
||||||
use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId};
|
use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId, WorkspaceId};
|
||||||
use spacesh_pty::{PtyHandle, SpawnSpec};
|
|
||||||
use tokio::net::{UnixListener, UnixStream};
|
use tokio::net::{UnixListener, UnixStream};
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
use crate::persist::{self, Persister};
|
||||||
use crate::registry::Registry;
|
use crate::registry::Registry;
|
||||||
use crate::surface::{spawn_surface, SurfaceMsg};
|
use crate::state_store::StateStore;
|
||||||
|
use crate::surface::{SurfaceMsg};
|
||||||
|
|
||||||
/// Per-client outbound channel: the router pushes envelopes the client task writes out.
|
/// Per-client outbound channel: the router pushes envelopes the client task writes out.
|
||||||
type ClientTx = mpsc::Sender<Envelope>;
|
type ClientTx = mpsc::Sender<Envelope>;
|
||||||
@@ -29,7 +32,7 @@ enum ServerMsg {
|
|||||||
|
|
||||||
type ClientId = u64;
|
type ClientId = u64;
|
||||||
|
|
||||||
pub async fn serve(socket: &Path) -> Result<()> {
|
pub async fn serve(socket: &Path, store: Arc<dyn StateStore>) -> Result<()> {
|
||||||
let listener = UnixListener::bind(socket)?;
|
let listener = UnixListener::bind(socket)?;
|
||||||
let (router_tx, router_rx) = mpsc::channel::<ServerMsg>(256);
|
let (router_tx, router_rx) = mpsc::channel::<ServerMsg>(256);
|
||||||
|
|
||||||
@@ -42,7 +45,9 @@ pub async fn serve(socket: &Path) -> Result<()> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx));
|
let persister = persist::spawn(store.clone(), Duration::from_millis(500));
|
||||||
|
let initial = store.load().unwrap_or_default();
|
||||||
|
let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx, persister, initial));
|
||||||
|
|
||||||
let mut next_client: ClientId = 0;
|
let mut next_client: ClientId = 0;
|
||||||
loop {
|
loop {
|
||||||
@@ -97,8 +102,11 @@ async fn router(
|
|||||||
mut rx: mpsc::Receiver<ServerMsg>,
|
mut rx: mpsc::Receiver<ServerMsg>,
|
||||||
router_tx: mpsc::Sender<ServerMsg>,
|
router_tx: mpsc::Sender<ServerMsg>,
|
||||||
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
|
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
|
||||||
|
persister: Persister,
|
||||||
|
initial: crate::state_store::PersistState,
|
||||||
) {
|
) {
|
||||||
let mut reg = Registry::new();
|
let mut reg = Registry::new();
|
||||||
|
reg.restore(initial);
|
||||||
let mut clients: HashMap<ClientId, ClientTx> = HashMap::new();
|
let mut clients: HashMap<ClientId, ClientTx> = HashMap::new();
|
||||||
// surface_id → set of client ids subscribed (attached).
|
// surface_id → set of client ids subscribed (attached).
|
||||||
let mut subs: HashMap<SurfaceId, Vec<ClientId>> = HashMap::new();
|
let mut subs: HashMap<SurfaceId, Vec<ClientId>> = HashMap::new();
|
||||||
@@ -125,11 +133,13 @@ async fn router(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
ServerMsg::Exit { surface_id, code } => {
|
ServerMsg::Exit { surface_id, code } => {
|
||||||
|
// Transition running -> stopped; keep panel + tree.
|
||||||
|
reg.mark_stopped(&surface_id);
|
||||||
let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code });
|
let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code });
|
||||||
broadcast_evt(&clients, &evt);
|
broadcast_evt(&clients, &evt);
|
||||||
}
|
}
|
||||||
ServerMsg::Request { id, cmd, client, out } => {
|
ServerMsg::Request { id, cmd, client, out } => {
|
||||||
handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx).await;
|
handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx, &persister).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -149,6 +159,15 @@ fn err(id: u64, code: &str, msg: &str) -> Envelope {
|
|||||||
error: Some(ErrorBody { code: code.into(), msg: msg.into() }) }
|
error: Some(ErrorBody { code: code.into(), msg: msg.into() }) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Emit a `layout_changed` event for a workspace's current tree.
|
||||||
|
fn emit_layout(reg: &Registry, ws_id: &WorkspaceId, clients: &HashMap<ClientId, ClientTx>) {
|
||||||
|
if let Some(w) = reg.workspace(ws_id) {
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::LayoutChanged {
|
||||||
|
workspace_id: ws_id.clone(), layout: w.layout.clone(),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn handle_request(
|
async fn handle_request(
|
||||||
id: u64,
|
id: u64,
|
||||||
@@ -160,116 +179,299 @@ async fn handle_request(
|
|||||||
clients: &HashMap<ClientId, ClientTx>,
|
clients: &HashMap<ClientId, ClientTx>,
|
||||||
router_tx: &mpsc::Sender<ServerMsg>,
|
router_tx: &mpsc::Sender<ServerMsg>,
|
||||||
exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>,
|
exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>,
|
||||||
|
persister: &Persister,
|
||||||
) {
|
) {
|
||||||
|
use spacesh_proto::message::{SplitDir, Edge};
|
||||||
|
use spacesh_proto::layout::{LayoutNode, Orient};
|
||||||
|
use spacesh_proto::workspace::SurfaceSpec;
|
||||||
|
|
||||||
match cmd {
|
match cmd {
|
||||||
Cmd::Open { path } => {
|
Cmd::Open { path } => {
|
||||||
let meta = reg.open_workspace(path.into());
|
let (ws_id, created) = reg.open_workspace(path.into());
|
||||||
let _ = out.send(ok(id, serde_json::json!({ "workspace_id": meta.id.0 }))).await;
|
if created {
|
||||||
|
if let Some(view) = reg.workspace_view(&ws_id) {
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view }));
|
||||||
}
|
}
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
|
}
|
||||||
|
let _ = out.send(ok(id, serde_json::json!({ "workspace_id": ws_id.0 }))).await;
|
||||||
|
}
|
||||||
|
|
||||||
Cmd::NewSurface { workspace_id, command, args, cols, rows } => {
|
Cmd::NewSurface { workspace_id, command, args, cols, rows } => {
|
||||||
let Some(ws) = reg.workspace(&workspace_id).cloned() else {
|
let Some(ws) = reg.workspace(&workspace_id).cloned() else {
|
||||||
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await;
|
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return;
|
||||||
return;
|
|
||||||
};
|
};
|
||||||
let sid = reg.new_surface_id();
|
let sid = reg.new_surface_id();
|
||||||
let shell = command.unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into()));
|
let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into()));
|
||||||
let spec = SpawnSpec {
|
let spec = SurfaceSpec {
|
||||||
command: shell,
|
command: shell, args: args.clone(), cwd: ws.path.clone(),
|
||||||
args,
|
agent_label: command, cols, rows, autostart: false,
|
||||||
cwd: ws.path.clone(),
|
|
||||||
cols,
|
|
||||||
rows,
|
|
||||||
env: vec![("SPACESH_SURFACE_ID".into(), sid.0.clone())],
|
|
||||||
};
|
};
|
||||||
match PtyHandle::spawn(spec) {
|
match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, exit_tx.clone()) {
|
||||||
Ok(pty) => {
|
Ok(handle) => {
|
||||||
let handle = spawn_surface(sid.clone(), workspace_id.clone(), pty, cols, rows, exit_tx.clone());
|
|
||||||
// Bridge the surface's broadcast into the router as Output messages.
|
|
||||||
spawn_output_bridge(sid.clone(), &handle, router_tx.clone());
|
spawn_output_bridge(sid.clone(), &handle, router_tx.clone());
|
||||||
reg.insert_surface(handle);
|
reg.set_live(handle);
|
||||||
let created = Envelope::Evt(Evt::SurfaceCreated {
|
reg.add_surface_spec(&workspace_id, sid.clone(), spec);
|
||||||
|
// First panel of an empty workspace becomes the root leaf.
|
||||||
|
if let Some(w) = reg.workspace_mut(&workspace_id) {
|
||||||
|
if w.layout.is_none() {
|
||||||
|
w.layout = Some(LayoutNode::leaf(sid.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated {
|
||||||
surface_id: sid.clone(), workspace_id: workspace_id.clone(),
|
surface_id: sid.clone(), workspace_id: workspace_id.clone(),
|
||||||
});
|
}));
|
||||||
broadcast_evt(clients, &created);
|
emit_layout(reg, &workspace_id, clients);
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
let _ = out.send(ok(id, serde_json::json!({ "surface_id": sid.0 }))).await;
|
let _ = out.send(ok(id, serde_json::json!({ "surface_id": sid.0 }))).await;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; }
|
||||||
let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Cmd::SplitSurface { surface_id, dir, command, args } => {
|
||||||
|
let Some(ws_id) = reg.workspace_of(&surface_id) else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return;
|
||||||
|
};
|
||||||
|
let ws = reg.workspace(&ws_id).cloned().unwrap();
|
||||||
|
let new_sid = reg.new_surface_id();
|
||||||
|
let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into()));
|
||||||
|
let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false };
|
||||||
|
match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, exit_tx.clone()) {
|
||||||
|
Ok(handle) => {
|
||||||
|
spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone());
|
||||||
|
reg.set_live(handle);
|
||||||
|
reg.add_surface_spec(&ws_id, new_sid.clone(), spec);
|
||||||
|
let orient = match dir { SplitDir::Right => Orient::H, SplitDir::Down => Orient::V };
|
||||||
|
if let Some(w) = reg.workspace_mut(&ws_id) {
|
||||||
|
let mut root = w.layout.take().unwrap_or_else(|| LayoutNode::leaf(surface_id.clone()));
|
||||||
|
spacesh_core::ops::split_leaf(&mut root, &surface_id, orient, true, new_sid.clone());
|
||||||
|
w.layout = Some(root);
|
||||||
}
|
}
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: new_sid.clone(), workspace_id: ws_id.clone() }));
|
||||||
|
emit_layout(reg, &ws_id, clients);
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
|
let _ = out.send(ok(id, serde_json::json!({ "surface_id": new_sid.0 }))).await;
|
||||||
|
}
|
||||||
|
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Cmd::SetRatios { workspace_id, node_path, ratios } => {
|
||||||
|
let ok_set = reg.workspace_mut(&workspace_id).map(|w| {
|
||||||
|
if let Some(l) = w.layout.as_mut() {
|
||||||
|
spacesh_core::ops::set_ratios(l, &node_path, &ratios)
|
||||||
|
} else { false }
|
||||||
|
}).unwrap_or(false);
|
||||||
|
if ok_set {
|
||||||
|
emit_layout(reg, &workspace_id, clients);
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
} else {
|
||||||
|
let _ = out.send(err(id, "BAD_REQUEST", "invalid node_path or ratios")).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Cmd::MoveSurface { surface_id, target_surface_id, edge } => {
|
||||||
|
let Some(ws_id) = reg.workspace_of(&surface_id) else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return;
|
||||||
|
};
|
||||||
|
if let Some(w) = reg.workspace_mut(&ws_id) {
|
||||||
|
if let Some(root) = w.layout.take() {
|
||||||
|
w.layout = Some(spacesh_core::ops::move_leaf(root, &surface_id, &target_surface_id, edge));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
emit_layout(reg, &ws_id, clients);
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Cmd::ApplyPreset { workspace_id, preset_id, slots } => {
|
||||||
|
let Some(count) = spacesh_core::presets::slot_count(&preset_id) else {
|
||||||
|
let _ = out.send(err(id, "BAD_REQUEST", "unknown preset")).await; return;
|
||||||
|
};
|
||||||
|
let Some(ws) = reg.workspace(&workspace_id).cloned() else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return;
|
||||||
|
};
|
||||||
|
// Kill current panels of this workspace.
|
||||||
|
let existing: Vec<SurfaceId> = ws.surfaces.keys().cloned().collect();
|
||||||
|
for sid in &existing {
|
||||||
|
if let Some(h) = reg.live(sid) { let _ = h.tx.send(crate::surface::SurfaceMsg::Close).await; }
|
||||||
|
reg.remove_surface(sid);
|
||||||
|
subs.remove(sid);
|
||||||
|
}
|
||||||
|
// Spawn `count` panels (slots padded/truncated to count).
|
||||||
|
let mut new_ids = Vec::new();
|
||||||
|
for i in 0..count {
|
||||||
|
let slot = slots.get(i);
|
||||||
|
let new_sid = reg.new_surface_id();
|
||||||
|
let command = slot.and_then(|s| s.command.clone());
|
||||||
|
let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into()));
|
||||||
|
let args = slot.map(|s| s.args.clone()).unwrap_or_default();
|
||||||
|
let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false };
|
||||||
|
match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, exit_tx.clone()) {
|
||||||
|
Ok(handle) => {
|
||||||
|
spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone());
|
||||||
|
reg.set_live(handle);
|
||||||
|
reg.add_surface_spec(&workspace_id, new_sid.clone(), spec);
|
||||||
|
new_ids.push(new_sid);
|
||||||
|
}
|
||||||
|
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; return; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(tree) = spacesh_core::presets::build(&preset_id, &new_ids) {
|
||||||
|
if let Some(w) = reg.workspace_mut(&workspace_id) { w.layout = Some(tree); }
|
||||||
|
}
|
||||||
|
for sid in &new_ids {
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: sid.clone(), workspace_id: workspace_id.clone() }));
|
||||||
|
}
|
||||||
|
emit_layout(reg, &workspace_id, clients);
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
|
let _ = out.send(ok(id, serde_json::json!({ "surface_ids": new_ids.iter().map(|s| s.0.clone()).collect::<Vec<_>>() }))).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Cmd::RestartSurface { surface_id } => {
|
||||||
|
if reg.is_running(&surface_id) {
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await; return; // already running
|
||||||
|
}
|
||||||
|
let Some(spec) = reg.surface_spec(&surface_id) else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return;
|
||||||
|
};
|
||||||
|
let ws_id = reg.workspace_of(&surface_id).unwrap();
|
||||||
|
match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, exit_tx.clone()) {
|
||||||
|
Ok(handle) => {
|
||||||
|
spawn_output_bridge(surface_id.clone(), &handle, router_tx.clone());
|
||||||
|
reg.set_live(handle);
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceRestarted { surface_id: surface_id.clone() }));
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
}
|
||||||
|
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Cmd::CloseWorkspace { workspace_id } => {
|
||||||
|
let ids = reg.close_workspace(&workspace_id);
|
||||||
|
for sid in &ids { subs.remove(sid); }
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceClosed { workspace_id: workspace_id.clone() }));
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Cmd::SetWorkspaceMeta { workspace_id, name, group_id, unread, order } => {
|
||||||
|
let found = reg.workspace_mut(&workspace_id).map(|w| {
|
||||||
|
if let Some(n) = name { w.name = n; }
|
||||||
|
if let Some(g) = group_id { w.group_id = g; }
|
||||||
|
if let Some(u) = unread { w.unread = u; }
|
||||||
|
if let Some(o) = order { w.order = o; }
|
||||||
|
}).is_some();
|
||||||
|
if found {
|
||||||
|
if let Some(view) = reg.workspace_view(&workspace_id) {
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view }));
|
||||||
|
}
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
} else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Cmd::CreateGroup { name, color } => {
|
||||||
|
let gid = reg.create_group(name, color);
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() }));
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
|
let _ = out.send(ok(id, serde_json::json!({ "group_id": gid.0 }))).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Cmd::SetGroup { group_id, name, color, order } => {
|
||||||
|
let found = reg.group_mut(&group_id).map(|g| {
|
||||||
|
if let Some(n) = name { g.name = n; }
|
||||||
|
if let Some(c) = color { g.color = c; }
|
||||||
|
if let Some(o) = order { g.order = o; }
|
||||||
|
}).is_some();
|
||||||
|
if found {
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() }));
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
} else {
|
||||||
|
let _ = out.send(err(id, "NOT_FOUND", "group")).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Cmd::DeleteGroup { group_id } => {
|
||||||
|
reg.delete_group(&group_id);
|
||||||
|
broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() }));
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
|
}
|
||||||
|
|
||||||
Cmd::Input { surface_id, bytes } => {
|
Cmd::Input { surface_id, bytes } => {
|
||||||
let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&bytes) else {
|
let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&bytes) else {
|
||||||
let _ = out.send(err(id, "BAD_REQUEST", "invalid base64")).await;
|
let _ = out.send(err(id, "BAD_REQUEST", "invalid base64")).await; return;
|
||||||
return;
|
|
||||||
};
|
};
|
||||||
if let Some(s) = reg.surface(&surface_id) {
|
if let Some(s) = reg.live(&surface_id) {
|
||||||
let _ = s.tx.send(SurfaceMsg::Input(decoded)).await;
|
let _ = s.tx.send(crate::surface::SurfaceMsg::Input(decoded)).await;
|
||||||
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
} else {
|
} else {
|
||||||
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Cmd::Resize { surface_id, cols, rows } => {
|
Cmd::Resize { surface_id, cols, rows } => {
|
||||||
if let Some(s) = reg.surface(&surface_id) {
|
if let Some(s) = reg.live(&surface_id) {
|
||||||
let _ = s.tx.send(SurfaceMsg::Resize { cols, rows }).await;
|
let _ = s.tx.send(crate::surface::SurfaceMsg::Resize { cols, rows }).await;
|
||||||
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
} else {
|
} else {
|
||||||
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Cmd::Attach { surface_id } => {
|
Cmd::Attach { surface_id } => {
|
||||||
if let Some(s) = reg.surface(&surface_id) {
|
if let Some(s) = reg.live(&surface_id) {
|
||||||
let (reply_tx, reply_rx) = oneshot::channel();
|
let (reply_tx, reply_rx) = oneshot::channel();
|
||||||
if s.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.is_ok() {
|
if s.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.is_ok() {
|
||||||
if let Ok((snap, _sub)) = reply_rx.await {
|
if let Ok((snap, _sub)) = reply_rx.await {
|
||||||
subs.entry(surface_id.clone()).or_default().push(client);
|
subs.entry(surface_id.clone()).or_default().push(client);
|
||||||
let _ = out.send(ok(id, serde_json::json!({
|
let _ = out.send(ok(id, serde_json::json!({
|
||||||
"snapshot": snap.ansi,
|
"snapshot": snap.ansi, "cols": snap.cols, "rows": snap.rows,
|
||||||
"cols": snap.cols,
|
"cursor_row": snap.cursor_row, "cursor_col": snap.cursor_col,
|
||||||
"rows": snap.rows,
|
|
||||||
"cursor_row": snap.cursor_row,
|
|
||||||
"cursor_col": snap.cursor_col,
|
|
||||||
}))).await;
|
}))).await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let _ = out.send(err(id, "INTERNAL", "attach failed")).await;
|
let _ = out.send(err(id, "INTERNAL", "attach failed")).await;
|
||||||
} else {
|
} else {
|
||||||
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
// stopped panel: no live stream, return an empty snapshot so the GUI shows the restart overlay.
|
||||||
|
let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0, "stopped": true }))).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Cmd::Detach { surface_id } => {
|
Cmd::Detach { surface_id } => {
|
||||||
if let Some(list) = subs.get_mut(&surface_id) {
|
if let Some(list) = subs.get_mut(&surface_id) { list.retain(|c| *c != client); }
|
||||||
list.retain(|c| *c != client);
|
|
||||||
}
|
|
||||||
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
||||||
}
|
|
||||||
Cmd::Focus { surface_id: _ } => {
|
|
||||||
// Focus is a no-op in this slice (window raise is GUI-side; CLI parity later).
|
|
||||||
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Cmd::Focus { surface_id: _ } => { let _ = out.send(ok(id, serde_json::Value::Null)).await; }
|
||||||
|
|
||||||
Cmd::Close { surface_id } => {
|
Cmd::Close { surface_id } => {
|
||||||
if let Some(handle) = reg.remove_surface(&surface_id) {
|
if reg.surface_spec(&surface_id).is_some() {
|
||||||
let _ = handle.tx.send(SurfaceMsg::Close).await;
|
if let Some(h) = reg.live(&surface_id) { let _ = h.tx.send(crate::surface::SurfaceMsg::Close).await; }
|
||||||
|
let ws_id = reg.workspace_of(&surface_id);
|
||||||
|
reg.remove_surface(&surface_id);
|
||||||
subs.remove(&surface_id);
|
subs.remove(&surface_id);
|
||||||
let closed = Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() });
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() }));
|
||||||
broadcast_evt(clients, &closed);
|
if let Some(ws_id) = ws_id { emit_layout(reg, &ws_id, clients); }
|
||||||
|
persister.mark_dirty(reg.persist_state());
|
||||||
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
} else {
|
} else {
|
||||||
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Cmd::Status => {
|
Cmd::Status => {
|
||||||
let workspaces: Vec<_> = reg.status().into_iter().map(|(w, sids)| {
|
let (groups, workspaces) = reg.status();
|
||||||
serde_json::json!({
|
let _ = out.send(ok(id, serde_json::json!({ "groups": groups, "workspaces": workspaces }))).await;
|
||||||
"workspace_id": w.id.0,
|
|
||||||
"path": w.path.to_string_lossy(),
|
|
||||||
"surfaces": sids.iter().map(|s| s.0.clone()).collect::<Vec<_>>(),
|
|
||||||
})
|
|
||||||
}).collect();
|
|
||||||
let _ = out.send(ok(id, serde_json::json!({ "workspaces": workspaces }))).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Cmd::Shutdown => {
|
Cmd::Shutdown => {
|
||||||
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
std::process::exit(0);
|
std::process::exit(0);
|
||||||
@@ -321,13 +523,36 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn res_data(env: &Envelope) -> &serde_json::Value {
|
||||||
|
match env { Envelope::Res { data, .. } => data, _ => panic!("not a res") }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tempdir_path() -> std::path::PathBuf {
|
||||||
|
let mut p = std::env::temp_dir();
|
||||||
|
let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos();
|
||||||
|
p.push(format!("spaceshd-test-{n}"));
|
||||||
|
std::fs::create_dir_all(&p).unwrap();
|
||||||
|
p
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_socket(sock: &Path) {
|
||||||
|
for _ in 0..300 {
|
||||||
|
if UnixStream::connect(sock).await.is_ok() { return; }
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
|
||||||
|
}
|
||||||
|
panic!("socket never came up");
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn open_new_surface_attach_streams_output() {
|
async fn open_new_surface_attach_streams_output() {
|
||||||
let _serial = crate::test_support::serial();
|
let _serial = crate::test_support::serial();
|
||||||
let dir = tempdir_path();
|
let dir = tempdir_path();
|
||||||
let sock = dir.join("sock");
|
let sock = dir.join("sock");
|
||||||
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||||
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task).await; });
|
let store2 = store.clone();
|
||||||
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
@@ -364,8 +589,11 @@ mod tests {
|
|||||||
let _serial = crate::test_support::serial();
|
let _serial = crate::test_support::serial();
|
||||||
let dir = tempdir_path();
|
let dir = tempdir_path();
|
||||||
let sock = dir.join("sock");
|
let sock = dir.join("sock");
|
||||||
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||||
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task).await; });
|
let store2 = store.clone();
|
||||||
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
let r = req(&mut s, 1, Cmd::Input {
|
let r = req(&mut s, 1, Cmd::Input {
|
||||||
@@ -381,33 +609,16 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn res_data(env: &Envelope) -> &serde_json::Value {
|
|
||||||
match env { Envelope::Res { data, .. } => data, _ => panic!("not a res") }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn tempdir_path() -> std::path::PathBuf {
|
|
||||||
let mut p = std::env::temp_dir();
|
|
||||||
let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos();
|
|
||||||
p.push(format!("spaceshd-test-{n}"));
|
|
||||||
std::fs::create_dir_all(&p).unwrap();
|
|
||||||
p
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_socket(sock: &Path) {
|
|
||||||
for _ in 0..300 {
|
|
||||||
if UnixStream::connect(sock).await.is_ok() { return; }
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
|
|
||||||
}
|
|
||||||
panic!("socket never came up");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn reattach_returns_snapshot_with_prior_output() {
|
async fn reattach_returns_snapshot_with_prior_output() {
|
||||||
let _serial = crate::test_support::serial();
|
let _serial = crate::test_support::serial();
|
||||||
let dir = tempdir_path();
|
let dir = tempdir_path();
|
||||||
let sock = dir.join("sock");
|
let sock = dir.join("sock");
|
||||||
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||||
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task).await; });
|
let store2 = store.clone();
|
||||||
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
// First client: open, new surface that prints a marker, attach, then disconnect.
|
// First client: open, new surface that prints a marker, attach, then disconnect.
|
||||||
@@ -436,4 +647,73 @@ mod tests {
|
|||||||
let snap = res_data(&r)["snapshot"].as_str().unwrap();
|
let snap = res_data(&r)["snapshot"].as_str().unwrap();
|
||||||
assert!(snap.contains("REPAINT_ME"), "snapshot was: {snap:?}");
|
assert!(snap.contains("REPAINT_ME"), "snapshot was: {snap:?}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn apply_preset_builds_tree_and_status_reports_it() {
|
||||||
|
let _serial = crate::test_support::serial();
|
||||||
|
let dir = tempdir_path();
|
||||||
|
let sock = dir.join("sock");
|
||||||
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||||
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||||
|
let sock2 = sock.clone();
|
||||||
|
tokio::spawn(async move { let _ = serve(&sock2, store).await; });
|
||||||
|
wait_for_socket(&sock).await;
|
||||||
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
|
||||||
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
||||||
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
||||||
|
let r = req(&mut s, 2, Cmd::ApplyPreset {
|
||||||
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
||||||
|
preset_id: "2x2".into(),
|
||||||
|
slots: vec![],
|
||||||
|
}).await;
|
||||||
|
let ids = res_data(&r)["surface_ids"].as_array().unwrap();
|
||||||
|
assert_eq!(ids.len(), 4);
|
||||||
|
|
||||||
|
let r = req(&mut s, 3, Cmd::Status).await;
|
||||||
|
let wss = res_data(&r)["workspaces"].as_array().unwrap();
|
||||||
|
let w0 = wss.iter().find(|w| w["id"] == ws).unwrap();
|
||||||
|
assert!(w0["layout"].is_object(), "layout tree present");
|
||||||
|
assert!(w0["layout"].to_string().contains("split"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn cold_restart_restores_structure_stopped() {
|
||||||
|
let _serial = crate::test_support::serial();
|
||||||
|
let dir = tempdir_path();
|
||||||
|
let state_path = dir.join("state.json");
|
||||||
|
let sock = dir.join("sock");
|
||||||
|
let ws;
|
||||||
|
{
|
||||||
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||||
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
||||||
|
let sock2 = sock.clone();
|
||||||
|
tokio::spawn(async move { let _ = serve(&sock2, store).await; });
|
||||||
|
wait_for_socket(&sock).await;
|
||||||
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
||||||
|
ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
||||||
|
req(&mut s, 2, Cmd::ApplyPreset {
|
||||||
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()), preset_id: "2tb".into(), slots: vec![],
|
||||||
|
}).await;
|
||||||
|
// allow debounce (500ms) to flush state.json
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(900)).await;
|
||||||
|
}
|
||||||
|
// "cold start": new store on the same state file, new socket.
|
||||||
|
let sock_b = dir.join("sock2");
|
||||||
|
let store_b: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||||
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
||||||
|
let sb2 = sock_b.clone();
|
||||||
|
tokio::spawn(async move { let _ = serve(&sock_b, store_b).await; });
|
||||||
|
wait_for_socket(&sb2).await;
|
||||||
|
let mut s2 = UnixStream::connect(&sb2).await.unwrap();
|
||||||
|
let r = req(&mut s2, 1, Cmd::Status).await;
|
||||||
|
let wss = res_data(&r)["workspaces"].as_array().unwrap();
|
||||||
|
let w0 = wss.iter().find(|w| w["id"] == ws).expect("workspace restored");
|
||||||
|
let surfaces = w0["surfaces"].as_object().unwrap();
|
||||||
|
assert_eq!(surfaces.len(), 2, "2tb panels restored");
|
||||||
|
for (_id, sv) in surfaces {
|
||||||
|
assert_eq!(sv["running"], false, "restored panels are stopped");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user