31c08b5387
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1650 lines
80 KiB
Rust
1650 lines
80 KiB
Rust
use std::collections::HashMap;
|
|
use std::path::Path;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use anyhow::Result;
|
|
use base64::Engine;
|
|
use spacesh_proto::codec::{read_frame, write_frame};
|
|
use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId, WorkspaceId};
|
|
use spacesh_proto::status::SurfaceState;
|
|
use tokio::net::{UnixListener, UnixStream};
|
|
use tokio::sync::{mpsc, oneshot};
|
|
use crate::event_log::EventLog;
|
|
use crate::event_store::{self, EventPersister, EventStore};
|
|
use crate::persist::{self, Persister};
|
|
use crate::registry::Registry;
|
|
use crate::snapshot_store::{SnapshotStore, SnapshotMsg, spawn_writer};
|
|
use crate::state_store::StateStore;
|
|
use crate::surface::{SurfaceMsg};
|
|
|
|
/// Per-client outbound channel: the router pushes envelopes the client task writes out.
|
|
type ClientTx = mpsc::Sender<Envelope>;
|
|
|
|
/// Messages into the single router task.
|
|
enum ServerMsg {
|
|
/// A request from a client; reply routed to that client's `out`.
|
|
Request { id: u64, cmd: Cmd, client: ClientId, out: ClientTx },
|
|
/// Forward an output chunk to all subscribers of `surface_id`.
|
|
Output { surface_id: SurfaceId, bytes: Vec<u8> },
|
|
/// A surface process exited.
|
|
Exit { surface_id: SurfaceId, code: i32 },
|
|
/// Register a new client's event sink.
|
|
ClientConnected { client: ClientId, out: ClientTx },
|
|
/// Drop a client and all its subscriptions.
|
|
ClientDisconnected { client: ClientId },
|
|
/// A status change detected internally (OSC 133 / fallback) by a surface actor.
|
|
StateDetected { surface_id: SurfaceId, state: SurfaceState },
|
|
/// Periodic snapshot tick: ask all live surfaces for a snapshot and persist dirty ones.
|
|
SnapshotTick,
|
|
}
|
|
|
|
type ClientId = u64;
|
|
|
|
pub async fn serve(socket: &Path, store: Arc<dyn StateStore>, event_store: Arc<dyn EventStore>, snapshot_store: Arc<dyn SnapshotStore>) -> Result<()> {
|
|
let listener = UnixListener::bind(socket)?;
|
|
let (router_tx, router_rx) = mpsc::channel::<ServerMsg>(256);
|
|
|
|
// Exit events from surfaces are funneled into the router.
|
|
let (exit_tx, mut exit_rx) = mpsc::unbounded_channel::<(SurfaceId, i32)>();
|
|
let router_for_exit = router_tx.clone();
|
|
tokio::spawn(async move {
|
|
while let Some((sid, code)) = exit_rx.recv().await {
|
|
let _ = router_for_exit.send(ServerMsg::Exit { surface_id: sid, code }).await;
|
|
}
|
|
});
|
|
|
|
let (state_tx, mut state_rx) = mpsc::unbounded_channel::<(SurfaceId, SurfaceState)>();
|
|
let router_for_state = router_tx.clone();
|
|
tokio::spawn(async move {
|
|
while let Some((sid, st)) = state_rx.recv().await {
|
|
let _ = router_for_state.send(ServerMsg::StateDetected { surface_id: sid, state: st }).await;
|
|
}
|
|
});
|
|
|
|
let snapshot_tx = spawn_writer(snapshot_store.clone());
|
|
|
|
// Periodic snapshot tick → router.
|
|
let tick_router = router_tx.clone();
|
|
let interval_secs = crate::config::Config::load().snapshot_interval_secs();
|
|
tokio::spawn(async move {
|
|
let mut tick = tokio::time::interval(Duration::from_secs(interval_secs));
|
|
tick.tick().await; // consume the immediate first tick
|
|
loop {
|
|
tick.tick().await;
|
|
if tick_router.send(ServerMsg::SnapshotTick).await.is_err() { break; }
|
|
}
|
|
});
|
|
|
|
let persister = persist::spawn(store.clone(), Duration::from_millis(500));
|
|
let initial = store.load().unwrap_or_default();
|
|
let event_persister = event_store::spawn(event_store.clone(), Duration::from_millis(500));
|
|
let event_initial = event_store.load().unwrap_or_default();
|
|
let started_at_ms = now_millis();
|
|
let shutdown = tokio::spawn(router(
|
|
router_rx, router_tx.clone(), exit_tx, state_tx,
|
|
persister, initial, event_persister, event_initial,
|
|
started_at_ms, snapshot_store, snapshot_tx,
|
|
));
|
|
|
|
let mut next_client: ClientId = 0;
|
|
loop {
|
|
let (stream, _addr) = listener.accept().await?;
|
|
let client_id = next_client;
|
|
next_client += 1;
|
|
let router_tx = router_tx.clone();
|
|
tokio::spawn(handle_client(stream, client_id, router_tx));
|
|
if shutdown.is_finished() {
|
|
break;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_client(stream: UnixStream, client_id: ClientId, router_tx: mpsc::Sender<ServerMsg>) {
|
|
let (mut read_half, mut write_half) = stream.into_split();
|
|
let (out_tx, mut out_rx) = mpsc::channel::<Envelope>(256);
|
|
|
|
let _ = router_tx
|
|
.send(ServerMsg::ClientConnected { client: client_id, out: out_tx.clone() })
|
|
.await;
|
|
|
|
// Writer task: drain outbound envelopes to the socket.
|
|
let writer = tokio::spawn(async move {
|
|
while let Some(env) = out_rx.recv().await {
|
|
if write_frame(&mut write_half, &env).await.is_err() {
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Reader loop: parse frames and forward requests to the router.
|
|
loop {
|
|
match read_frame(&mut read_half).await {
|
|
Ok(Some(Envelope::Req { id, cmd })) => {
|
|
let _ = router_tx
|
|
.send(ServerMsg::Request { id, cmd, client: client_id, out: out_tx.clone() })
|
|
.await;
|
|
}
|
|
Ok(Some(_)) => { /* clients don't send res/evt; ignore */ }
|
|
Ok(None) => break, // EOF
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
|
|
let _ = router_tx.send(ServerMsg::ClientDisconnected { client: client_id }).await;
|
|
writer.abort();
|
|
}
|
|
|
|
async fn router(
|
|
mut rx: mpsc::Receiver<ServerMsg>,
|
|
router_tx: mpsc::Sender<ServerMsg>,
|
|
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
|
|
state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
|
|
persister: Persister,
|
|
initial: crate::state_store::PersistState,
|
|
event_persister: EventPersister,
|
|
event_initial: crate::event_log::EventLogState,
|
|
started_at_ms: u64,
|
|
snapshot_store: Arc<dyn SnapshotStore>,
|
|
snapshot_tx: mpsc::UnboundedSender<SnapshotMsg>,
|
|
) {
|
|
let mut reg = Registry::new();
|
|
reg.restore(initial);
|
|
let mut event_log = EventLog::restore(event_initial, 1000);
|
|
let mut clients: HashMap<ClientId, ClientTx> = HashMap::new();
|
|
// surface_id → set of client ids subscribed (attached).
|
|
let mut subs: HashMap<SurfaceId, Vec<ClientId>> = HashMap::new();
|
|
let mut config = crate::config::Config::load();
|
|
|
|
while let Some(msg) = rx.recv().await {
|
|
match msg {
|
|
ServerMsg::ClientConnected { client, out } => {
|
|
clients.insert(client, out);
|
|
}
|
|
ServerMsg::ClientDisconnected { client } => {
|
|
clients.remove(&client);
|
|
for list in subs.values_mut() {
|
|
list.retain(|c| *c != client);
|
|
}
|
|
}
|
|
ServerMsg::Output { surface_id, bytes } => {
|
|
if let Some(list) = subs.get(&surface_id) {
|
|
let evt = Envelope::Evt(Evt::Output { surface_id: surface_id.clone(), bytes });
|
|
for c in list {
|
|
if let Some(out) = clients.get(c) {
|
|
let _ = out.try_send(evt.clone());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
ServerMsg::Exit { surface_id, code } => {
|
|
reg.mark_stopped(&surface_id);
|
|
reg.drop_state(&surface_id);
|
|
record_event(®, &mut event_log, &event_persister, &clients,
|
|
&surface_id, spacesh_proto::event::EventKind::Exit);
|
|
let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code });
|
|
broadcast_evt(&clients, &evt);
|
|
}
|
|
ServerMsg::StateDetected { surface_id, state } => {
|
|
if reg.is_running(&surface_id) {
|
|
reg.set_state(&surface_id, state);
|
|
broadcast_evt(&clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state }));
|
|
if let Some(kind) = kind_for_state(state) {
|
|
record_event(®, &mut event_log, &event_persister, &clients, &surface_id, kind);
|
|
}
|
|
}
|
|
}
|
|
ServerMsg::SnapshotTick => {
|
|
let ids = reg.live_ids();
|
|
for sid in ids {
|
|
let Some(handle) = reg.live(&sid) else { continue };
|
|
let (reply_tx, reply_rx) = oneshot::channel();
|
|
if handle.tx.send(SurfaceMsg::Snapshot { reply: reply_tx }).await.is_err() { continue; }
|
|
if let Ok((snap, dirty)) = reply_rx.await {
|
|
if dirty {
|
|
let _ = snapshot_tx.send(SnapshotMsg::Save(sid.clone(), snap));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
ServerMsg::Request { id, cmd, client, out } => {
|
|
handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients,
|
|
&router_tx, &exit_tx, &state_tx, &persister,
|
|
&mut event_log, &event_persister, started_at_ms, &mut config,
|
|
&snapshot_store, &snapshot_tx).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn broadcast_evt(clients: &HashMap<ClientId, ClientTx>, evt: &Envelope) {
|
|
for out in clients.values() {
|
|
let _ = out.try_send(evt.clone());
|
|
}
|
|
}
|
|
|
|
/// Current unix-epoch milliseconds. `as u64` is safe — epoch millis fit u64 for ~584M years.
|
|
fn now_millis() -> u64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.map(|d| d.as_millis() as u64)
|
|
.unwrap_or(0)
|
|
}
|
|
|
|
/// Which state transitions are worth logging. work/idle are noise → None.
|
|
fn kind_for_state(state: SurfaceState) -> Option<spacesh_proto::event::EventKind> {
|
|
use spacesh_proto::event::EventKind;
|
|
match state {
|
|
SurfaceState::Done => Some(EventKind::Done),
|
|
SurfaceState::Wait => Some(EventKind::Wait),
|
|
SurfaceState::Error => Some(EventKind::Error),
|
|
SurfaceState::Work | SurfaceState::Idle => None,
|
|
}
|
|
}
|
|
|
|
/// Record one event (denormalizing workspace name + agent label), persist, broadcast.
|
|
fn record_event(
|
|
reg: &Registry,
|
|
log: &mut EventLog,
|
|
persister: &EventPersister,
|
|
clients: &HashMap<ClientId, ClientTx>,
|
|
sid: &SurfaceId,
|
|
kind: spacesh_proto::event::EventKind,
|
|
) {
|
|
// No workspace → the surface was already removed (user-initiated Close / ApplyPreset /
|
|
// CloseWorkspace remove it synchronously before the async Exit arrives). Such deliberate
|
|
// closes are intentionally NOT logged — only spontaneous process exits and status
|
|
// transitions become events.
|
|
let Some(ws_id) = reg.workspace_of(sid) else { return };
|
|
let ws_name = reg.workspace(&ws_id).map(|w| w.name.clone()).unwrap_or_default();
|
|
let agent = reg.surface_spec(sid).and_then(|s| s.agent_label);
|
|
let rec = log.record(sid.clone(), ws_id, ws_name, agent, kind, now_millis());
|
|
persister.mark_dirty(log.snapshot());
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::Event { record: rec }));
|
|
}
|
|
|
|
fn ok(id: u64, data: serde_json::Value) -> Envelope {
|
|
Envelope::Res { id, ok: true, data, error: None }
|
|
}
|
|
fn err(id: u64, code: &str, msg: &str) -> Envelope {
|
|
Envelope::Res { id, ok: false, data: serde_json::Value::Null,
|
|
error: Some(ErrorBody { code: code.into(), msg: msg.into() }) }
|
|
}
|
|
|
|
/// Compute spawn env (hooks for claude agents, zsh integration for zsh shells)
|
|
/// and whether a deterministic hook source is active.
|
|
fn spawn_env(sid: &SurfaceId, spec: &spacesh_proto::workspace::SurfaceSpec) -> (Vec<(String, String)>, bool) {
|
|
let (mut env, active) = if crate::hooks::is_agent(&spec.command, spec.agent_label.as_deref()) {
|
|
let env = crate::hooks::prepare(sid, &crate::hooks::spacesh_bin());
|
|
let active = !env.is_empty();
|
|
(env, active)
|
|
} else if crate::hooks::is_zsh(&spec.command) {
|
|
(crate::hooks::shell_env(sid), false)
|
|
} else {
|
|
(vec![], false)
|
|
};
|
|
// Ensure the child sees the user's full PATH; the GUI/launchd-launched daemon
|
|
// otherwise can't find agents (claude/codex/gemini) and the panel exits at once.
|
|
if !env.iter().any(|(k, _)| k == "PATH") {
|
|
env.push(("PATH".to_string(), crate::config::enriched_path()));
|
|
}
|
|
(env, active)
|
|
}
|
|
|
|
/// Build the spawn spec for a (re)start. When `resume` and the command has a
|
|
/// resume mapping, its args are replaced with the resume args; otherwise the
|
|
/// original spec args are kept.
|
|
fn resume_spec(
|
|
spec: &spacesh_proto::workspace::SurfaceSpec,
|
|
resume: bool,
|
|
cfg: &crate::config::Config,
|
|
) -> spacesh_proto::workspace::SurfaceSpec {
|
|
let mut out = spec.clone();
|
|
if resume {
|
|
if let Some(args) = cfg.resume_args(&spec.command) {
|
|
out.args = args;
|
|
}
|
|
}
|
|
out
|
|
}
|
|
|
|
/// Emit a `layout_changed` event for a workspace's current tree.
|
|
fn emit_layout(reg: &Registry, ws_id: &WorkspaceId, clients: &HashMap<ClientId, ClientTx>) {
|
|
if let Some(w) = reg.workspace(ws_id) {
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::LayoutChanged {
|
|
workspace_id: ws_id.clone(), layout: w.layout.clone(),
|
|
}));
|
|
}
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
async fn handle_request(
|
|
id: u64,
|
|
cmd: Cmd,
|
|
client: ClientId,
|
|
out: ClientTx,
|
|
reg: &mut Registry,
|
|
subs: &mut HashMap<SurfaceId, Vec<ClientId>>,
|
|
clients: &HashMap<ClientId, ClientTx>,
|
|
router_tx: &mpsc::Sender<ServerMsg>,
|
|
exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>,
|
|
state_tx: &mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
|
|
persister: &Persister,
|
|
event_log: &mut EventLog,
|
|
event_persister: &EventPersister,
|
|
started_at_ms: u64,
|
|
config: &mut crate::config::Config,
|
|
snapshot_store: &Arc<dyn SnapshotStore>,
|
|
snapshot_tx: &mpsc::UnboundedSender<SnapshotMsg>,
|
|
) {
|
|
use spacesh_proto::message::SplitDir;
|
|
use spacesh_proto::layout::{LayoutNode, Orient};
|
|
use spacesh_proto::workspace::SurfaceSpec;
|
|
|
|
match cmd {
|
|
Cmd::Open { path } => {
|
|
let (ws_id, created) = reg.open_workspace(path.into());
|
|
if created {
|
|
if let Some(view) = reg.workspace_view(&ws_id) {
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view }));
|
|
}
|
|
persister.mark_dirty(reg.persist_state());
|
|
}
|
|
let _ = out.send(ok(id, serde_json::json!({ "workspace_id": ws_id.0 }))).await;
|
|
}
|
|
|
|
Cmd::NewSurface { workspace_id, command, args, cols, rows } => {
|
|
let Some(ws) = reg.workspace(&workspace_id).cloned() else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return;
|
|
};
|
|
let sid = reg.new_surface_id();
|
|
let shell = command.clone().unwrap_or_else(|| config.resolved_shell());
|
|
let spec = SurfaceSpec {
|
|
command: shell, args: args.clone(), cwd: ws.path.clone(),
|
|
agent_label: command, cols, rows, autostart: false,
|
|
};
|
|
let (env, hooks_active) = spawn_env(&sid, &spec);
|
|
match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) {
|
|
Ok(handle) => {
|
|
spawn_output_bridge(sid.clone(), &handle, router_tx.clone());
|
|
reg.set_live(handle);
|
|
reg.set_state(&sid, spacesh_proto::SurfaceState::Idle);
|
|
reg.add_surface_spec(&workspace_id, sid.clone(), spec);
|
|
// First panel of an empty workspace becomes the root leaf.
|
|
if let Some(w) = reg.workspace_mut(&workspace_id) {
|
|
if w.layout.is_none() {
|
|
w.layout = Some(LayoutNode::leaf(sid.clone()));
|
|
}
|
|
}
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated {
|
|
surface_id: sid.clone(), workspace_id: workspace_id.clone(),
|
|
}));
|
|
emit_layout(reg, &workspace_id, clients);
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::json!({ "surface_id": sid.0 }))).await;
|
|
}
|
|
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; }
|
|
}
|
|
}
|
|
|
|
Cmd::SplitSurface { surface_id, dir, command, args } => {
|
|
let Some(ws_id) = reg.workspace_of(&surface_id) else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return;
|
|
};
|
|
let ws = reg.workspace(&ws_id).cloned().unwrap();
|
|
let new_sid = reg.new_surface_id();
|
|
let shell = command.clone().unwrap_or_else(|| config.resolved_shell());
|
|
let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false };
|
|
let (env, hooks_active) = spawn_env(&new_sid, &spec);
|
|
match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) {
|
|
Ok(handle) => {
|
|
spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone());
|
|
reg.set_live(handle);
|
|
reg.set_state(&new_sid, spacesh_proto::SurfaceState::Idle);
|
|
reg.add_surface_spec(&ws_id, new_sid.clone(), spec);
|
|
let orient = match dir { SplitDir::Right => Orient::H, SplitDir::Down => Orient::V };
|
|
if let Some(w) = reg.workspace_mut(&ws_id) {
|
|
let mut root = w.layout.take().unwrap_or_else(|| LayoutNode::leaf(surface_id.clone()));
|
|
spacesh_core::ops::split_leaf(&mut root, &surface_id, orient, true, new_sid.clone());
|
|
w.layout = Some(root);
|
|
}
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: new_sid.clone(), workspace_id: ws_id.clone() }));
|
|
emit_layout(reg, &ws_id, clients);
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::json!({ "surface_id": new_sid.0 }))).await;
|
|
}
|
|
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; }
|
|
}
|
|
}
|
|
|
|
Cmd::SetRatios { workspace_id, node_path, ratios } => {
|
|
let ok_set = reg.workspace_mut(&workspace_id).map(|w| {
|
|
if let Some(l) = w.layout.as_mut() {
|
|
spacesh_core::ops::set_ratios(l, &node_path, &ratios)
|
|
} else { false }
|
|
}).unwrap_or(false);
|
|
if ok_set {
|
|
emit_layout(reg, &workspace_id, clients);
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
} else {
|
|
let _ = out.send(err(id, "BAD_REQUEST", "invalid node_path or ratios")).await;
|
|
}
|
|
}
|
|
|
|
Cmd::MoveSurface { surface_id, target_surface_id, edge } => {
|
|
let Some(ws_id) = reg.workspace_of(&surface_id) else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return;
|
|
};
|
|
if let Some(w) = reg.workspace_mut(&ws_id) {
|
|
if let Some(root) = w.layout.take() {
|
|
w.layout = Some(spacesh_core::ops::move_leaf(root, &surface_id, &target_surface_id, edge));
|
|
}
|
|
}
|
|
emit_layout(reg, &ws_id, clients);
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
}
|
|
|
|
Cmd::ApplyPreset { workspace_id, preset_id, slots } => {
|
|
let Some(count) = spacesh_core::presets::slot_count(&preset_id) else {
|
|
let _ = out.send(err(id, "BAD_REQUEST", "unknown preset")).await; return;
|
|
};
|
|
let Some(ws) = reg.workspace(&workspace_id).cloned() else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return;
|
|
};
|
|
// Kill current panels of this workspace.
|
|
let existing: Vec<SurfaceId> = ws.surfaces.keys().cloned().collect();
|
|
for sid in &existing {
|
|
if let Some(h) = reg.live(sid) { let _ = h.tx.send(crate::surface::SurfaceMsg::Close).await; }
|
|
reg.remove_surface(sid);
|
|
subs.remove(sid);
|
|
}
|
|
// Spawn `count` panels (slots padded/truncated to count).
|
|
let mut new_ids = Vec::new();
|
|
for i in 0..count {
|
|
let slot = slots.get(i);
|
|
let new_sid = reg.new_surface_id();
|
|
let command = slot.and_then(|s| s.command.clone());
|
|
let shell = command.clone().unwrap_or_else(|| config.resolved_shell());
|
|
let args = slot.map(|s| s.args.clone()).unwrap_or_default();
|
|
let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false };
|
|
let (env, hooks_active) = spawn_env(&new_sid, &spec);
|
|
match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) {
|
|
Ok(handle) => {
|
|
spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone());
|
|
reg.set_live(handle);
|
|
reg.set_state(&new_sid, spacesh_proto::SurfaceState::Idle);
|
|
reg.add_surface_spec(&workspace_id, new_sid.clone(), spec);
|
|
new_ids.push(new_sid);
|
|
}
|
|
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; return; }
|
|
}
|
|
}
|
|
if let Some(tree) = spacesh_core::presets::build(&preset_id, &new_ids) {
|
|
if let Some(w) = reg.workspace_mut(&workspace_id) { w.layout = Some(tree); }
|
|
}
|
|
for sid in &new_ids {
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceCreated { surface_id: sid.clone(), workspace_id: workspace_id.clone() }));
|
|
}
|
|
emit_layout(reg, &workspace_id, clients);
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::json!({ "surface_ids": new_ids.iter().map(|s| s.0.clone()).collect::<Vec<_>>() }))).await;
|
|
}
|
|
|
|
Cmd::RestartSurface { surface_id, resume } => {
|
|
if reg.is_running(&surface_id) {
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await; return; // already running
|
|
}
|
|
let Some(spec) = reg.surface_spec(&surface_id) else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return;
|
|
};
|
|
let spec = resume_spec(&spec, resume, config);
|
|
let ws_id = reg.workspace_of(&surface_id).unwrap();
|
|
let (env, hooks_active) = spawn_env(&surface_id, &spec);
|
|
match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) {
|
|
Ok(handle) => {
|
|
spawn_output_bridge(surface_id.clone(), &handle, router_tx.clone());
|
|
reg.set_live(handle);
|
|
reg.set_state(&surface_id, spacesh_proto::SurfaceState::Idle);
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceRestarted { surface_id: surface_id.clone() }));
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
}
|
|
Err(e) => { let _ = out.send(err(id, "SPAWN_FAILED", &e.to_string())).await; }
|
|
}
|
|
}
|
|
|
|
Cmd::CloseWorkspace { workspace_id } => {
|
|
let ids = reg.close_workspace(&workspace_id);
|
|
for sid in &ids { crate::hooks::cleanup(sid); crate::hooks::cleanup_shell(sid); subs.remove(sid); }
|
|
for sid in &ids { let _ = snapshot_tx.send(SnapshotMsg::Remove(sid.clone())); }
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceClosed { workspace_id: workspace_id.clone() }));
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
}
|
|
|
|
Cmd::SetWorkspaceMeta { workspace_id, name, group_id, unread, order, pinned } => {
|
|
let found = reg.workspace_mut(&workspace_id).map(|w| {
|
|
if let Some(n) = name { w.name = n; }
|
|
if let Some(g) = group_id { w.group_id = g; }
|
|
if let Some(u) = unread { w.unread = u; }
|
|
if let Some(o) = order { w.order = o; }
|
|
if let Some(p) = pinned { w.pinned = p; }
|
|
}).is_some();
|
|
if found {
|
|
if let Some(view) = reg.workspace_view(&workspace_id) {
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view }));
|
|
}
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
} else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await;
|
|
}
|
|
}
|
|
|
|
Cmd::CreateGroup { name, color } => {
|
|
let gid = reg.create_group(name, color);
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() }));
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::json!({ "group_id": gid.0 }))).await;
|
|
}
|
|
|
|
Cmd::SetGroup { group_id, name, color, order } => {
|
|
let found = reg.group_mut(&group_id).map(|g| {
|
|
if let Some(n) = name { g.name = n; }
|
|
if let Some(c) = color { g.color = c; }
|
|
if let Some(o) = order { g.order = o; }
|
|
}).is_some();
|
|
if found {
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() }));
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
} else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "group")).await;
|
|
}
|
|
}
|
|
|
|
Cmd::DeleteGroup { group_id } => {
|
|
reg.delete_group(&group_id);
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::GroupsChanged { groups: reg.groups() }));
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
}
|
|
|
|
Cmd::Input { surface_id, bytes } => {
|
|
let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&bytes) else {
|
|
let _ = out.send(err(id, "BAD_REQUEST", "invalid base64")).await; return;
|
|
};
|
|
if let Some(s) = reg.live(&surface_id) {
|
|
let _ = s.tx.send(crate::surface::SurfaceMsg::Input(decoded)).await;
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
} else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
|
}
|
|
}
|
|
|
|
Cmd::Resize { surface_id, cols, rows } => {
|
|
if let Some(s) = reg.live(&surface_id) {
|
|
let _ = s.tx.send(crate::surface::SurfaceMsg::Resize { cols, rows }).await;
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
} else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
|
}
|
|
}
|
|
|
|
Cmd::Attach { surface_id } => {
|
|
if let Some(s) = reg.live(&surface_id) {
|
|
let (reply_tx, reply_rx) = oneshot::channel();
|
|
if s.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.is_ok() {
|
|
if let Ok((snap, _sub)) = reply_rx.await {
|
|
subs.entry(surface_id.clone()).or_default().push(client);
|
|
let _ = out.send(ok(id, serde_json::json!({
|
|
"snapshot": snap.ansi, "cols": snap.cols, "rows": snap.rows,
|
|
"cursor_row": snap.cursor_row, "cursor_col": snap.cursor_col,
|
|
}))).await;
|
|
return;
|
|
}
|
|
}
|
|
let _ = out.send(err(id, "INTERNAL", "attach failed")).await;
|
|
} else {
|
|
// stopped panel: no live stream. Paint the last on-disk screen if we have one.
|
|
match snapshot_store.load(&surface_id) {
|
|
Some(snap) => {
|
|
let _ = out.send(ok(id, serde_json::json!({
|
|
"snapshot": snap.ansi, "cols": snap.cols, "rows": snap.rows,
|
|
"cursor_row": snap.cursor_row, "cursor_col": snap.cursor_col, "stopped": true,
|
|
}))).await;
|
|
}
|
|
None => {
|
|
let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0, "stopped": true }))).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Cmd::Detach { surface_id } => {
|
|
if let Some(list) = subs.get_mut(&surface_id) { list.retain(|c| *c != client); }
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
}
|
|
|
|
Cmd::Focus { surface_id } => {
|
|
let ids = event_log.mark_read(&spacesh_proto::event::MarkReadTarget::Surface(surface_id));
|
|
if !ids.is_empty() {
|
|
event_persister.mark_dirty(event_log.snapshot());
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::EventsRead { ids }));
|
|
}
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
}
|
|
|
|
Cmd::Close { surface_id } => {
|
|
if reg.surface_spec(&surface_id).is_some() {
|
|
if let Some(h) = reg.live(&surface_id) { let _ = h.tx.send(crate::surface::SurfaceMsg::Close).await; }
|
|
let ws_id = reg.workspace_of(&surface_id);
|
|
reg.remove_surface(&surface_id);
|
|
subs.remove(&surface_id);
|
|
crate::hooks::cleanup(&surface_id);
|
|
crate::hooks::cleanup_shell(&surface_id);
|
|
let _ = snapshot_tx.send(SnapshotMsg::Remove(surface_id.clone()));
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() }));
|
|
if let Some(ws_id) = ws_id {
|
|
emit_layout(reg, &ws_id, clients);
|
|
if let Some(view) = reg.workspace_view(&ws_id) {
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view }));
|
|
}
|
|
}
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
} else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
|
}
|
|
}
|
|
|
|
Cmd::SetState { surface_id, state } => {
|
|
if reg.is_running(&surface_id) {
|
|
reg.set_state(&surface_id, state);
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::State { surface_id: surface_id.clone(), state }));
|
|
if let Some(kind) = kind_for_state(state) {
|
|
record_event(reg, event_log, event_persister, clients, &surface_id, kind);
|
|
}
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
} else {
|
|
// unknown or stopped surface — status is only meaningful while running.
|
|
let _ = out.send(err(id, "NOT_FOUND", "surface not running")).await;
|
|
}
|
|
}
|
|
|
|
Cmd::SetZoom { workspace_id, surface_id } => {
|
|
let Some(w) = reg.workspace(&workspace_id) else {
|
|
let _ = out.send(err(id, "NOT_FOUND", "workspace")).await; return;
|
|
};
|
|
if let Some(sid) = &surface_id {
|
|
if !w.surfaces.contains_key(sid) {
|
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return;
|
|
}
|
|
}
|
|
reg.workspace_mut(&workspace_id).expect("workspace validated above").zoomed = surface_id.clone();
|
|
if let Some(view) = reg.workspace_view(&workspace_id) {
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceChanged { workspace: view }));
|
|
}
|
|
persister.mark_dirty(reg.persist_state());
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
}
|
|
|
|
Cmd::Health => {
|
|
let _ = out.send(ok(id, serde_json::json!({
|
|
"version": env!("CARGO_PKG_VERSION"),
|
|
"build": option_env!("SPACESH_BUILD").unwrap_or("dev"),
|
|
"pid": std::process::id(),
|
|
"started_at_ms": started_at_ms,
|
|
}))).await;
|
|
}
|
|
|
|
Cmd::WhichAgents { candidates } => {
|
|
let available: Vec<String> = candidates.into_iter().filter(|c| crate::config::is_installed(c)).collect();
|
|
let _ = out.send(ok(id, serde_json::json!({ "available": available }))).await;
|
|
}
|
|
|
|
Cmd::Status => {
|
|
let (groups, workspaces) = reg.status();
|
|
let _ = out.send(ok(id, serde_json::json!({ "groups": groups, "workspaces": workspaces }))).await;
|
|
}
|
|
|
|
Cmd::EventLog { limit } => {
|
|
let events = event_log.recent(limit);
|
|
let unread = event_log.unread_count();
|
|
let _ = out.send(ok(id, serde_json::json!({ "events": events, "unread": unread }))).await;
|
|
}
|
|
|
|
Cmd::MarkRead { target } => {
|
|
let ids = event_log.mark_read(&target);
|
|
if !ids.is_empty() {
|
|
event_persister.mark_dirty(event_log.snapshot());
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::EventsRead { ids }));
|
|
}
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
}
|
|
|
|
Cmd::ClearEvents => {
|
|
event_log.clear();
|
|
event_persister.mark_dirty(event_log.snapshot());
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::EventsCleared));
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
}
|
|
|
|
Cmd::Shutdown => {
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
std::process::exit(0);
|
|
}
|
|
|
|
Cmd::GetConfig => {
|
|
match serde_json::to_value(config.to_view()) {
|
|
Ok(v) => { let _ = out.send(ok(id, v)).await; }
|
|
Err(e) => { let _ = out.send(err(id, "INTERNAL", &e.to_string())).await; }
|
|
}
|
|
}
|
|
|
|
Cmd::SetConfig { default_shell, font_family, font_size, theme, accent } => {
|
|
if let Some(v) = &theme {
|
|
if v != "dark" && v != "light" { let _ = out.send(err(id, "BAD_CONFIG", "theme")).await; return; }
|
|
}
|
|
if let Some(v) = &accent {
|
|
const ACCENTS: [&str; 5] = ["blue", "teal", "purple", "green", "orange"];
|
|
if !ACCENTS.contains(&v.as_str()) { let _ = out.send(err(id, "BAD_CONFIG", "accent")).await; return; }
|
|
}
|
|
let mut next = config.clone();
|
|
if let Some(v) = default_shell { next.default_shell = if v.is_empty() { None } else { Some(v) }; }
|
|
if let Some(v) = font_family { next.terminal.font_family = if v.is_empty() { None } else { Some(v) }; }
|
|
if let Some(v) = font_size { next.terminal.font_size = Some(v.clamp(10, 20)); }
|
|
if let Some(v) = theme { next.appearance.theme = Some(v); }
|
|
if let Some(v) = accent { next.appearance.accent = Some(v); }
|
|
let to_save = next.clone();
|
|
match tokio::task::spawn_blocking(move || to_save.save()).await {
|
|
Ok(Ok(())) => {
|
|
*config = next;
|
|
let view = config.to_view();
|
|
broadcast_evt(clients, &Envelope::Evt(Evt::ConfigChanged { config: view }));
|
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
|
}
|
|
Ok(Err(e)) => { let _ = out.send(err(id, "SAVE_FAILED", &e.to_string())).await; }
|
|
Err(e) => { let _ = out.send(err(id, "SAVE_FAILED", &e.to_string())).await; }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Pump a surface's broadcast output into the router as `ServerMsg::Output`.
|
|
fn spawn_output_bridge(
|
|
surface_id: SurfaceId,
|
|
handle: &crate::surface::SurfaceHandle,
|
|
router_tx: mpsc::Sender<ServerMsg>,
|
|
) {
|
|
let tx = handle.tx.clone();
|
|
tokio::spawn(async move {
|
|
// Ask the actor for a subscription receiver.
|
|
let (reply_tx, reply_rx) = oneshot::channel();
|
|
if tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.is_err() {
|
|
return;
|
|
}
|
|
let Ok(mut sub) = reply_rx.await else { return };
|
|
loop {
|
|
match sub.recv().await {
|
|
Ok(bytes) => {
|
|
if router_tx.send(ServerMsg::Output { surface_id: surface_id.clone(), bytes }).await.is_err() {
|
|
break;
|
|
}
|
|
}
|
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
|
Err(_) => break, // surface closed
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use base64::Engine;
|
|
use crate::snapshot_store::NullSnapshotStore;
|
|
|
|
async fn req(stream: &mut UnixStream, id: u64, cmd: Cmd) -> Envelope {
|
|
write_frame(stream, &Envelope::Req { id, cmd }).await.unwrap();
|
|
// Read until we see the matching res (skip interleaved evts).
|
|
loop {
|
|
let env = read_frame(stream).await.unwrap().unwrap();
|
|
if let Envelope::Res { id: rid, .. } = &env {
|
|
if *rid == id { return env; }
|
|
}
|
|
}
|
|
}
|
|
|
|
fn res_data(env: &Envelope) -> &serde_json::Value {
|
|
match env { Envelope::Res { data, .. } => data, _ => panic!("not a res") }
|
|
}
|
|
|
|
fn tempdir_path() -> std::path::PathBuf {
|
|
let mut p = std::env::temp_dir();
|
|
let n = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos();
|
|
p.push(format!("spaceshd-test-{n}"));
|
|
std::fs::create_dir_all(&p).unwrap();
|
|
p
|
|
}
|
|
|
|
/// Build an event store whose file lives inside the per-test temp dir so it is
|
|
/// cleaned up with the rest of the test fixtures (not left in the global temp root).
|
|
fn make_event_store(dir: &Path) -> std::sync::Arc<dyn crate::event_store::EventStore> {
|
|
std::sync::Arc::new(crate::event_store::JsonEventStore::new(dir.join("events.json")))
|
|
}
|
|
|
|
async fn wait_for_socket(sock: &Path) {
|
|
for _ in 0..300 {
|
|
if UnixStream::connect(sock).await.is_ok() { return; }
|
|
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
|
|
}
|
|
panic!("socket never came up");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn open_new_surface_attach_streams_output() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
|
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "printf STREAM_OK; sleep 0.5".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
let surface_id = spacesh_proto::SurfaceId(sid);
|
|
|
|
let _ = req(&mut s, 3, Cmd::Attach { surface_id: surface_id.clone() }).await;
|
|
|
|
// Now read frames looking for an Output evt containing STREAM_OK.
|
|
let mut got = String::new();
|
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(6);
|
|
while tokio::time::Instant::now() < deadline {
|
|
if let Ok(Ok(Some(Envelope::Evt(Evt::Output { bytes, .. })))) =
|
|
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut s)).await {
|
|
got.push_str(&String::from_utf8_lossy(&bytes));
|
|
if got.contains("STREAM_OK") { break; }
|
|
}
|
|
}
|
|
assert!(got.contains("STREAM_OK"), "got: {got:?}");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn unknown_surface_returns_not_found() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
let r = req(&mut s, 1, Cmd::Input {
|
|
surface_id: spacesh_proto::SurfaceId("s_nope".into()),
|
|
bytes: base64::engine::general_purpose::STANDARD.encode(b"x"),
|
|
}).await;
|
|
match r {
|
|
Envelope::Res { ok, error, .. } => {
|
|
assert!(!ok);
|
|
assert_eq!(error.unwrap().code, "NOT_FOUND");
|
|
}
|
|
_ => panic!(),
|
|
}
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn reattach_returns_snapshot_with_prior_output() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
|
|
// First client: open, new surface that prints a marker, attach, then disconnect.
|
|
let surface_id;
|
|
{
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "printf REPAINT_ME; sleep 2".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
surface_id = spacesh_proto::SurfaceId(res_data(&r)["surface_id"].as_str().unwrap().to_string());
|
|
// Give the actor time to flush output into the grid.
|
|
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
|
|
// disconnect by dropping `s`
|
|
}
|
|
|
|
// Second client: attach to the same surface, expect snapshot to contain the marker.
|
|
// Re-verify the socket is still up before connecting (handles any scheduling jitter).
|
|
wait_for_socket(&sock).await;
|
|
let mut s2 = UnixStream::connect(&sock).await.unwrap();
|
|
let r = req(&mut s2, 1, Cmd::Attach { surface_id: surface_id.clone() }).await;
|
|
let snap = res_data(&r)["snapshot"].as_str().unwrap();
|
|
assert!(snap.contains("REPAINT_ME"), "snapshot was: {snap:?}");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn apply_preset_builds_tree_and_status_reports_it() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock2 = sock.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut s, 2, Cmd::ApplyPreset {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
preset_id: "2x2".into(),
|
|
slots: vec![],
|
|
}).await;
|
|
let ids = res_data(&r)["surface_ids"].as_array().unwrap();
|
|
assert_eq!(ids.len(), 4);
|
|
|
|
let r = req(&mut s, 3, Cmd::Status).await;
|
|
let wss = res_data(&r)["workspaces"].as_array().unwrap();
|
|
let w0 = wss.iter().find(|w| w["id"] == ws).unwrap();
|
|
assert!(w0["layout"].is_object(), "layout tree present");
|
|
assert!(w0["layout"].to_string().contains("split"));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn set_state_updates_status_and_emits_event() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock2 = sock.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "sleep 1".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
let surface_id = spacesh_proto::SurfaceId(sid.clone());
|
|
|
|
// set_state on the running surface
|
|
let r = req(&mut s, 3, Cmd::SetState { surface_id: surface_id.clone(), state: spacesh_proto::status::SurfaceState::Work }).await;
|
|
assert!(matches!(r, Envelope::Res { ok: true, .. }));
|
|
|
|
// status reflects it
|
|
let r = req(&mut s, 4, Cmd::Status).await;
|
|
let wss = res_data(&r)["workspaces"].as_array().unwrap();
|
|
let w0 = wss.iter().find(|w| w["id"] == ws).unwrap();
|
|
assert_eq!(w0["surfaces"][&sid]["state"], "work");
|
|
|
|
// unknown surface -> NOT_FOUND
|
|
let r = req(&mut s, 5, Cmd::SetState { surface_id: spacesh_proto::SurfaceId("s_nope".into()), state: spacesh_proto::status::SurfaceState::Done }).await;
|
|
match r { Envelope::Res { ok, error, .. } => { assert!(!ok); assert_eq!(error.unwrap().code, "NOT_FOUND"); }, _ => panic!() }
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn cold_restart_restores_structure_stopped() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let state_path = dir.join("state.json");
|
|
let sock = dir.join("sock");
|
|
let ws;
|
|
{
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
|
// Both daemon instances in this test share ONE event-store file under the
|
|
// per-test dir so instance B reads from disk what instance A persisted.
|
|
let event_store = make_event_store(&dir);
|
|
let sock2 = sock.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
req(&mut s, 2, Cmd::ApplyPreset {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()), preset_id: "2tb".into(), slots: vec![],
|
|
}).await;
|
|
// allow debounce (500ms) to flush state.json
|
|
tokio::time::sleep(tokio::time::Duration::from_millis(900)).await;
|
|
}
|
|
// "cold start": new store on the same state file, new socket.
|
|
let sock_b = dir.join("sock2");
|
|
let store_b: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
|
let event_store_b = make_event_store(&dir);
|
|
let sb2 = sock_b.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sb2).await;
|
|
let mut s2 = UnixStream::connect(&sb2).await.unwrap();
|
|
let r = req(&mut s2, 1, Cmd::Status).await;
|
|
let wss = res_data(&r)["workspaces"].as_array().unwrap();
|
|
let w0 = wss.iter().find(|w| w["id"] == ws).expect("workspace restored");
|
|
let surfaces = w0["surfaces"].as_object().unwrap();
|
|
assert_eq!(surfaces.len(), 2, "2tb panels restored");
|
|
for (_id, sv) in surfaces {
|
|
assert_eq!(sv["running"], false, "restored panels are stopped");
|
|
}
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn osc133_in_pty_sets_status_over_socket() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock2 = sock.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "printf '\\033]133;C\\007'; printf hi; printf '\\033]133;D;0\\007'; sleep 1".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
let surface_id = spacesh_proto::SurfaceId(sid.clone());
|
|
let _ = req(&mut s, 3, Cmd::Attach { surface_id }).await;
|
|
|
|
// Wait for a State event to flow (Work then Done).
|
|
let mut saw_done = false;
|
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
|
|
while tokio::time::Instant::now() < deadline {
|
|
if let Ok(Ok(Some(Envelope::Evt(Evt::State { state, .. })))) =
|
|
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut s)).await {
|
|
if state == spacesh_proto::status::SurfaceState::Done { saw_done = true; break; }
|
|
}
|
|
}
|
|
assert!(saw_done, "expected a Done state event from OSC 133");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn set_state_done_emits_event_record() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
|
|
// Control connection: open workspace and spawn surface.
|
|
let mut ctrl = UnixStream::connect(&sock).await.unwrap();
|
|
let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut ctrl, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "sleep 5".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
|
|
// Observer connection: a second client that receives all broadcast events
|
|
// without its read loop consuming them via req().
|
|
let mut observer = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
// Trigger Done via SetState on the control connection.
|
|
let _ = req(&mut ctrl, 3, Cmd::SetState {
|
|
surface_id: spacesh_proto::SurfaceId(sid.clone()),
|
|
state: spacesh_proto::status::SurfaceState::Done,
|
|
}).await;
|
|
|
|
// Expect an Evt::Event for this surface on the observer within a short window.
|
|
let mut found = None;
|
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
|
|
while tokio::time::Instant::now() < deadline {
|
|
if let Ok(Ok(Some(env))) =
|
|
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await {
|
|
if let Envelope::Evt(Evt::Event { record }) = env {
|
|
if record.surface_id.0 == sid { found = Some(record); break; }
|
|
}
|
|
}
|
|
}
|
|
let rec = found.expect("expected an Evt::Event for the surface");
|
|
assert_eq!(rec.kind, spacesh_proto::event::EventKind::Done);
|
|
assert!(!rec.read);
|
|
assert_eq!(rec.workspace_id.0, ws);
|
|
assert!(!rec.workspace_name.is_empty(), "workspace name should be denormalized into the record");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn close_does_not_emit_event_record() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
|
|
// Control connection: open workspace and spawn surface.
|
|
let mut ctrl = UnixStream::connect(&sock).await.unwrap();
|
|
let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut ctrl, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "sleep 5".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
|
|
// Observer connection: receives all broadcast events.
|
|
let mut observer = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
// User-initiated Close on the control connection.
|
|
let _ = req(&mut ctrl, 3, Cmd::Close {
|
|
surface_id: spacesh_proto::SurfaceId(sid.clone()),
|
|
}).await;
|
|
|
|
// A deliberate Close must surface an Evt::Exit but NEVER an Evt::Event for it.
|
|
let mut saw_exit = false;
|
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2);
|
|
while tokio::time::Instant::now() < deadline {
|
|
if let Ok(Ok(Some(env))) =
|
|
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await {
|
|
match env {
|
|
Envelope::Evt(Evt::Event { record }) if record.surface_id.0 == sid => {
|
|
panic!("user-initiated Close must not produce an Evt::Event");
|
|
}
|
|
Envelope::Evt(Evt::Exit { surface_id, .. }) if surface_id.0 == sid => {
|
|
saw_exit = true;
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
assert!(saw_exit, "expected an Evt::Exit for the closed surface");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn osc133_detected_state_emits_event_record() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock2 = sock.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
|
|
// Control connection: open workspace and spawn a surface that emits OSC 133.
|
|
let mut ctrl = UnixStream::connect(&sock).await.unwrap();
|
|
let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut ctrl, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "printf '\\033]133;C\\007'; printf hi; printf '\\033]133;D;0\\007'; sleep 1".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
|
|
// Observer connection: receives all broadcast events (the detected-state path
|
|
// flows through ServerMsg::StateDetected → record_event → Evt::Event).
|
|
let mut observer = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
// Drive the PTY output by attaching the control connection.
|
|
let _ = req(&mut ctrl, 3, Cmd::Attach {
|
|
surface_id: spacesh_proto::SurfaceId(sid.clone()),
|
|
}).await;
|
|
|
|
// Expect an Evt::Event (kind=done) for this surface from the OSC 133 Done detection.
|
|
let mut found = None;
|
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
|
|
while tokio::time::Instant::now() < deadline {
|
|
if let Ok(Ok(Some(env))) =
|
|
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await {
|
|
if let Envelope::Evt(Evt::Event { record }) = env {
|
|
if record.surface_id.0 == sid { found = Some(record); break; }
|
|
}
|
|
}
|
|
}
|
|
let rec = found.expect("expected an Evt::Event from the OSC 133 detected state");
|
|
assert_eq!(rec.kind, spacesh_proto::event::EventKind::Done);
|
|
assert!(!rec.read);
|
|
assert_eq!(rec.workspace_id.0, ws);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn event_log_query_and_mark_read() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
|
|
// Observer connection to catch the EventsRead broadcast.
|
|
let mut obs = UnixStream::connect(&sock).await.unwrap();
|
|
// Control connection.
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "sleep 5".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
|
|
let _ = req(&mut s, 3, Cmd::SetState {
|
|
surface_id: spacesh_proto::SurfaceId(sid.clone()),
|
|
state: spacesh_proto::status::SurfaceState::Error,
|
|
}).await;
|
|
|
|
// Query the log.
|
|
let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await;
|
|
let data = res_data(&log);
|
|
assert_eq!(data["unread"].as_u64().unwrap(), 1);
|
|
let first_id = data["events"][0]["id"].as_u64().unwrap();
|
|
|
|
// Mark it read by id.
|
|
let _ = req(&mut s, 5, Cmd::MarkRead {
|
|
target: spacesh_proto::event::MarkReadTarget::Ids(vec![first_id]),
|
|
}).await;
|
|
|
|
// Observer should see EventsRead { ids: [first_id] }.
|
|
let mut saw_read = false;
|
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
|
|
while tokio::time::Instant::now() < deadline {
|
|
if let Ok(Ok(Some(env))) =
|
|
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut obs)).await {
|
|
if let Envelope::Evt(Evt::EventsRead { ids }) = env {
|
|
if ids == vec![first_id] { saw_read = true; break; }
|
|
}
|
|
}
|
|
}
|
|
assert!(saw_read, "expected an EventsRead broadcast for the marked id");
|
|
|
|
// Unread is now 0.
|
|
let log = req(&mut s, 6, Cmd::EventLog { limit: None }).await;
|
|
assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 0);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn event_log_persists_across_daemon_restart() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let state_path = dir.join("state.json");
|
|
let sock = dir.join("sock");
|
|
|
|
let event_id: u64;
|
|
let ws_id: String;
|
|
|
|
// ── Instance A ────────────────────────────────────────────────────────
|
|
{
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
|
let event_store = make_event_store(&dir);
|
|
let sock2 = sock.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
// Open workspace, spawn surface.
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
ws_id = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
|
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws_id.clone()),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "sleep 5".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
|
|
// Drive an Error state → one unread event is logged.
|
|
let _ = req(&mut s, 3, Cmd::SetState {
|
|
surface_id: spacesh_proto::SurfaceId(sid.clone()),
|
|
state: spacesh_proto::status::SurfaceState::Error,
|
|
}).await;
|
|
|
|
// Query and assert unread == 1 before restart.
|
|
let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await;
|
|
let data = res_data(&log);
|
|
assert_eq!(data["unread"].as_u64().unwrap(), 1, "instance A: expected 1 unread event");
|
|
assert_eq!(data["events"][0]["kind"].as_str().unwrap(), "error");
|
|
assert_eq!(data["events"][0]["workspace_id"].as_str().unwrap(), ws_id);
|
|
event_id = data["events"][0]["id"].as_u64().unwrap();
|
|
|
|
// Wait comfortably longer than the 500 ms debounce so events.json is flushed.
|
|
tokio::time::sleep(tokio::time::Duration::from_millis(900)).await;
|
|
// Drop `s` (and instance A's task) by falling out of scope.
|
|
}
|
|
|
|
// ── Instance B (same dir, fresh socket path) ──────────────────────────
|
|
let sock_b = dir.join("sock2");
|
|
let store_b: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
|
let event_store_b = make_event_store(&dir);
|
|
let sb2 = sock_b.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sb2).await;
|
|
|
|
let mut s2 = UnixStream::connect(&sb2).await.unwrap();
|
|
|
|
// Query event log on instance B — the persisted event must survive the restart.
|
|
let log = req(&mut s2, 1, Cmd::EventLog { limit: None }).await;
|
|
let data = res_data(&log);
|
|
assert_eq!(data["unread"].as_u64().unwrap(), 1,
|
|
"instance B: event log unread count must survive cold restart");
|
|
assert_eq!(data["events"][0]["id"].as_u64().unwrap(), event_id,
|
|
"instance B: event id must match");
|
|
assert_eq!(data["events"][0]["kind"].as_str().unwrap(), "error",
|
|
"instance B: event kind must be 'error'");
|
|
assert_eq!(data["events"][0]["workspace_id"].as_str().unwrap(), ws_id,
|
|
"instance B: workspace_id must match");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn focus_marks_surface_events_read() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
|
|
// Observer connection.
|
|
let mut obs = UnixStream::connect(&sock).await.unwrap();
|
|
// Control connection.
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "sleep 5".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
|
|
// Drive an Error event for this surface.
|
|
let _ = req(&mut s, 3, Cmd::SetState {
|
|
surface_id: spacesh_proto::SurfaceId(sid.clone()),
|
|
state: spacesh_proto::status::SurfaceState::Error,
|
|
}).await;
|
|
|
|
// Verify unread == 1 before Focus.
|
|
let log = req(&mut s, 4, Cmd::EventLog { limit: None }).await;
|
|
assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 1);
|
|
let first_id = res_data(&log)["events"][0]["id"].as_u64().unwrap();
|
|
|
|
// Focus that surface — should mark its events read.
|
|
let _ = req(&mut s, 5, Cmd::Focus {
|
|
surface_id: spacesh_proto::SurfaceId(sid.clone()),
|
|
}).await;
|
|
|
|
// Observer should receive EventsRead with the id from the Error event.
|
|
let mut saw_read = false;
|
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3);
|
|
while tokio::time::Instant::now() < deadline {
|
|
if let Ok(Ok(Some(env))) =
|
|
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut obs)).await {
|
|
if let Envelope::Evt(Evt::EventsRead { ids }) = env {
|
|
if ids.contains(&first_id) { saw_read = true; break; }
|
|
}
|
|
}
|
|
}
|
|
assert!(saw_read, "expected an EventsRead broadcast after Focus");
|
|
|
|
// Unread drops to 0.
|
|
let log = req(&mut s, 6, Cmd::EventLog { limit: None }).await;
|
|
assert_eq!(res_data(&log)["unread"].as_u64().unwrap(), 0);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn health_reports_version_pid_started() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64;
|
|
let r = req(&mut s, 1, Cmd::Health).await;
|
|
let d = res_data(&r);
|
|
assert!(!d["version"].as_str().unwrap().is_empty());
|
|
assert!(d["pid"].as_u64().unwrap() > 0);
|
|
let started = d["started_at_ms"].as_u64().unwrap();
|
|
assert!(started > 0 && started >= now.saturating_sub(5000) && started <= now + 1000, "started_at_ms plausible: {started} vs now {now}");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn set_zoom_sets_and_clears_and_autoclears() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
command: Some("/bin/sh".into()), args: vec!["-c".into(), "sleep 5".into()], cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
|
|
let _ = req(&mut s, 3, Cmd::SetZoom {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
surface_id: Some(spacesh_proto::SurfaceId(sid.clone())),
|
|
}).await;
|
|
let st = req(&mut s, 4, Cmd::Status).await;
|
|
let w0 = res_data(&st)["workspaces"].as_array().unwrap().iter().find(|w| w["id"] == ws).unwrap().clone();
|
|
assert_eq!(w0["zoomed"], sid);
|
|
|
|
let _ = req(&mut s, 5, Cmd::SetZoom {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()), surface_id: None,
|
|
}).await;
|
|
let st = req(&mut s, 6, Cmd::Status).await;
|
|
let w0 = res_data(&st)["workspaces"].as_array().unwrap().iter().find(|w| w["id"] == ws).unwrap().clone();
|
|
assert!(w0["zoomed"].is_null());
|
|
|
|
let _ = req(&mut s, 7, Cmd::SetZoom {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
surface_id: Some(spacesh_proto::SurfaceId(sid.clone())),
|
|
}).await;
|
|
let _ = req(&mut s, 8, Cmd::Close { surface_id: spacesh_proto::SurfaceId(sid.clone()) }).await;
|
|
let st = req(&mut s, 9, Cmd::Status).await;
|
|
let w0 = res_data(&st)["workspaces"].as_array().unwrap().iter().find(|w| w["id"] == ws).unwrap().clone();
|
|
assert!(w0["zoomed"].is_null(), "closing the zoomed surface clears zoom");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn close_zoomed_broadcasts_workspace_changed() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
|
wait_for_socket(&sock).await;
|
|
|
|
// Control connection: open, spawn, zoom.
|
|
let mut ctrl = UnixStream::connect(&sock).await.unwrap();
|
|
let r = req(&mut ctrl, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut ctrl, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
command: Some("/bin/sh".into()), args: vec!["-c".into(), "sleep 5".into()], cols: 80, rows: 24,
|
|
}).await;
|
|
let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string();
|
|
let _ = req(&mut ctrl, 3, Cmd::SetZoom {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws.clone()),
|
|
surface_id: Some(spacesh_proto::SurfaceId(sid.clone())),
|
|
}).await;
|
|
|
|
// Observer connection: must be attached BEFORE the Close so it catches the broadcast.
|
|
let mut observer = UnixStream::connect(&sock).await.unwrap();
|
|
|
|
// Close the zoomed surface on the control connection.
|
|
let _ = req(&mut ctrl, 4, Cmd::Close { surface_id: spacesh_proto::SurfaceId(sid.clone()) }).await;
|
|
|
|
// Observer must receive a WorkspaceChanged for this workspace with zoomed == None.
|
|
let mut saw_cleared = false;
|
|
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2);
|
|
while tokio::time::Instant::now() < deadline {
|
|
if let Ok(Ok(Some(env))) =
|
|
tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut observer)).await {
|
|
if let Envelope::Evt(Evt::WorkspaceChanged { workspace }) = env {
|
|
if workspace.id.0 == ws {
|
|
assert!(workspace.zoomed.is_none(), "WorkspaceChanged must report cleared zoom");
|
|
saw_cleared = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
assert!(saw_cleared, "expected a WorkspaceChanged broadcast with cleared zoom after closing the zoomed surface");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn stopped_attach_returns_disk_snapshot() {
|
|
let _serial = crate::test_support::serial();
|
|
let dir = tempdir_path();
|
|
let sock = dir.join("sock");
|
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
|
let event_store = make_event_store(&dir);
|
|
// Use a real JsonSnapshotStore so the on-exit dump actually lands on disk.
|
|
let snap_dir = dir.join("snapshots");
|
|
let snapshot_store: std::sync::Arc<dyn crate::snapshot_store::SnapshotStore> =
|
|
std::sync::Arc::new(crate::snapshot_store::JsonSnapshotStore::new(snap_dir.clone()));
|
|
let sock_for_task = sock.clone();
|
|
let store2 = store.clone();
|
|
let snap_store2 = snapshot_store.clone();
|
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, snap_store2).await; });
|
|
wait_for_socket(&sock).await;
|
|
|
|
// Open workspace, spawn a surface that prints a unique marker then exits quickly.
|
|
let surface_id;
|
|
{
|
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
|
workspace_id: spacesh_proto::WorkspaceId(ws),
|
|
command: Some("/bin/sh".into()),
|
|
args: vec!["-c".into(), "printf SNAPDISK; sleep 0.2".into()],
|
|
cols: 80, rows: 24,
|
|
}).await;
|
|
surface_id = spacesh_proto::SurfaceId(res_data(&r)["surface_id"].as_str().unwrap().to_string());
|
|
// Give the process time to run, exit, and the actor to dump its snapshot to the writer.
|
|
tokio::time::sleep(tokio::time::Duration::from_millis(1500)).await;
|
|
// s drops here
|
|
}
|
|
|
|
// Re-verify the socket is still alive.
|
|
wait_for_socket(&sock).await;
|
|
|
|
// Fresh client: attach to the now-stopped surface.
|
|
let mut s2 = UnixStream::connect(&sock).await.unwrap();
|
|
let r = req(&mut s2, 1, Cmd::Attach { surface_id: surface_id.clone() }).await;
|
|
let data = res_data(&r);
|
|
assert_eq!(data["stopped"].as_bool(), Some(true), "surface should be stopped");
|
|
let snap = data["snapshot"].as_str().unwrap_or("");
|
|
assert!(snap.contains("SNAPDISK"), "on-disk snapshot should contain SNAPDISK, got: {snap:?}");
|
|
}
|
|
|
|
#[test]
|
|
fn resume_spec_swaps_args_when_mapped() {
|
|
use spacesh_proto::workspace::SurfaceSpec;
|
|
let spec = SurfaceSpec {
|
|
command: "claude".into(), args: vec!["--foo".into()], cwd: "/tmp".into(),
|
|
agent_label: Some("claude".into()), cols: 80, rows: 24, autostart: false,
|
|
};
|
|
let cfg = crate::config::Config::default();
|
|
// resume=false → original args
|
|
let plain = resume_spec(&spec, false, &cfg);
|
|
assert_eq!(plain.args, vec!["--foo".to_string()]);
|
|
// resume=true with a default mapping → resume args
|
|
let resumed = resume_spec(&spec, true, &cfg);
|
|
assert_eq!(resumed.args, vec!["--continue".to_string()]);
|
|
// resume=true for an unmapped command → original args (graceful fallback)
|
|
let mut shell = spec.clone();
|
|
shell.command = "bash".into();
|
|
let resumed_shell = resume_spec(&shell, true, &cfg);
|
|
assert_eq!(resumed_shell.args, shell.args);
|
|
}
|
|
}
|