From c35585755e3a7f8542c2044e446c3853ed56af14 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Tue, 9 Jun 2026 23:05:18 +0700 Subject: [PATCH] =?UTF-8?q?feat(daemon):=20actor=20OSC133/fallback=20detec?= =?UTF-8?q?tion=20=E2=86=92=20set=5Fstate,=20hook/shell=20spawn=20env,=20c?= =?UTF-8?q?leanup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/spaceshd/src/server.rs | 89 ++++++++++++++++++++++++--- crates/spaceshd/src/surface.rs | 106 +++++++++++++++++++++++++++------ 2 files changed, 170 insertions(+), 25 deletions(-) diff --git a/crates/spaceshd/src/server.rs b/crates/spaceshd/src/server.rs index 3d45149..062aac5 100644 --- a/crates/spaceshd/src/server.rs +++ b/crates/spaceshd/src/server.rs @@ -6,6 +6,7 @@ use anyhow::Result; use base64::Engine; use spacesh_proto::codec::{read_frame, write_frame}; use spacesh_proto::{Cmd, Envelope, ErrorBody, Evt, SurfaceId, WorkspaceId}; +use spacesh_proto::status::SurfaceState; use tokio::net::{UnixListener, UnixStream}; use tokio::sync::{mpsc, oneshot}; use crate::persist::{self, Persister}; @@ -28,6 +29,8 @@ enum ServerMsg { ClientConnected { client: ClientId, out: ClientTx }, /// Drop a client and all its subscriptions. ClientDisconnected { client: ClientId }, + /// A status change detected internally (OSC 133 / fallback) by a surface actor. + StateDetected { surface_id: SurfaceId, state: SurfaceState }, } type ClientId = u64; @@ -45,9 +48,17 @@ pub async fn serve(socket: &Path, store: Arc) -> Result<()> { } }); + let (state_tx, mut state_rx) = mpsc::unbounded_channel::<(SurfaceId, SurfaceState)>(); + let router_for_state = router_tx.clone(); + tokio::spawn(async move { + while let Some((sid, st)) = state_rx.recv().await { + let _ = router_for_state.send(ServerMsg::StateDetected { surface_id: sid, state: st }).await; + } + }); + let persister = persist::spawn(store.clone(), Duration::from_millis(500)); let initial = store.load().unwrap_or_default(); - let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx, persister, initial)); + let shutdown = tokio::spawn(router(router_rx, router_tx.clone(), exit_tx, state_tx, persister, initial)); let mut next_client: ClientId = 0; loop { @@ -102,6 +113,7 @@ async fn router( mut rx: mpsc::Receiver, router_tx: mpsc::Sender, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, + state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, persister: Persister, initial: crate::state_store::PersistState, ) { @@ -138,8 +150,14 @@ async fn router( let evt = Envelope::Evt(Evt::Exit { surface_id: surface_id.clone(), code }); broadcast_evt(&clients, &evt); } + ServerMsg::StateDetected { surface_id, state } => { + if reg.is_running(&surface_id) { + reg.set_state(&surface_id, state); + broadcast_evt(&clients, &Envelope::Evt(Evt::State { surface_id, state })); + } + } ServerMsg::Request { id, cmd, client, out } => { - handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx, &persister).await; + handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients, &router_tx, &exit_tx, &state_tx, &persister).await; } } } @@ -159,6 +177,20 @@ fn err(id: u64, code: &str, msg: &str) -> Envelope { error: Some(ErrorBody { code: code.into(), msg: msg.into() }) } } +/// Compute spawn env (hooks for claude agents, zsh integration for zsh shells) +/// and whether a deterministic hook source is active. +fn spawn_env(sid: &SurfaceId, spec: &spacesh_proto::workspace::SurfaceSpec) -> (Vec<(String, String)>, bool) { + if crate::hooks::is_agent(&spec.command, spec.agent_label.as_deref()) { + let env = crate::hooks::prepare(sid, &crate::hooks::spacesh_bin()); + let active = !env.is_empty(); + (env, active) + } else if crate::hooks::is_zsh(&spec.command) { + (crate::hooks::shell_env(sid), false) + } else { + (vec![], false) + } +} + /// Emit a `layout_changed` event for a workspace's current tree. fn emit_layout(reg: &Registry, ws_id: &WorkspaceId, clients: &HashMap) { if let Some(w) = reg.workspace(ws_id) { @@ -179,6 +211,7 @@ async fn handle_request( clients: &HashMap, router_tx: &mpsc::Sender, exit_tx: &mpsc::UnboundedSender<(SurfaceId, i32)>, + state_tx: &mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, persister: &Persister, ) { use spacesh_proto::message::SplitDir; @@ -207,7 +240,8 @@ async fn handle_request( command: shell, args: args.clone(), cwd: ws.path.clone(), agent_label: command, cols, rows, autostart: false, }; - match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, exit_tx.clone()) { + let (env, hooks_active) = spawn_env(&sid, &spec); + match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { Ok(handle) => { spawn_output_bridge(sid.clone(), &handle, router_tx.clone()); reg.set_live(handle); @@ -238,7 +272,8 @@ async fn handle_request( let new_sid = reg.new_surface_id(); let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into())); let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false }; - match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, exit_tx.clone()) { + let (env, hooks_active) = spawn_env(&new_sid, &spec); + match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { Ok(handle) => { spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone()); reg.set_live(handle); @@ -311,7 +346,8 @@ async fn handle_request( let shell = command.clone().unwrap_or_else(|| std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".into())); let args = slot.map(|s| s.args.clone()).unwrap_or_default(); let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false }; - match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, exit_tx.clone()) { + let (env, hooks_active) = spawn_env(&new_sid, &spec); + match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { Ok(handle) => { spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone()); reg.set_live(handle); @@ -341,7 +377,8 @@ async fn handle_request( let _ = out.send(err(id, "NOT_FOUND", "surface")).await; return; }; let ws_id = reg.workspace_of(&surface_id).unwrap(); - match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, exit_tx.clone()) { + let (env, hooks_active) = spawn_env(&surface_id, &spec); + match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) { Ok(handle) => { spawn_output_bridge(surface_id.clone(), &handle, router_tx.clone()); reg.set_live(handle); @@ -355,7 +392,7 @@ async fn handle_request( Cmd::CloseWorkspace { workspace_id } => { let ids = reg.close_workspace(&workspace_id); - for sid in &ids { subs.remove(sid); } + for sid in &ids { crate::hooks::cleanup(sid); crate::hooks::cleanup_shell(sid); subs.remove(sid); } broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceClosed { workspace_id: workspace_id.clone() })); persister.mark_dirty(reg.persist_state()); let _ = out.send(ok(id, serde_json::Value::Null)).await; @@ -462,6 +499,8 @@ async fn handle_request( let ws_id = reg.workspace_of(&surface_id); reg.remove_surface(&surface_id); subs.remove(&surface_id); + crate::hooks::cleanup(&surface_id); + crate::hooks::cleanup_shell(&surface_id); broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() })); if let Some(ws_id) = ws_id { emit_layout(reg, &ws_id, clients); } persister.mark_dirty(reg.persist_state()); @@ -769,4 +808,40 @@ mod tests { assert_eq!(sv["running"], false, "restored panels are stopped"); } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn osc133_in_pty_sets_status_over_socket() { + let _serial = crate::test_support::serial(); + let dir = tempdir_path(); + let sock = dir.join("sock"); + let store: std::sync::Arc = + std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json"))); + let sock2 = sock.clone(); + tokio::spawn(async move { let _ = serve(&sock2, store).await; }); + wait_for_socket(&sock).await; + let mut s = UnixStream::connect(&sock).await.unwrap(); + + let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await; + let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string(); + let r = req(&mut s, 2, Cmd::NewSurface { + workspace_id: spacesh_proto::WorkspaceId(ws.clone()), + command: Some("/bin/sh".into()), + args: vec!["-c".into(), "printf '\\033]133;C\\007'; printf hi; printf '\\033]133;D;0\\007'; sleep 1".into()], + cols: 80, rows: 24, + }).await; + let sid = res_data(&r)["surface_id"].as_str().unwrap().to_string(); + let surface_id = spacesh_proto::SurfaceId(sid.clone()); + let _ = req(&mut s, 3, Cmd::Attach { surface_id }).await; + + // Wait for a State event to flow (Work then Done). + let mut saw_done = false; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + if let Ok(Ok(Some(Envelope::Evt(Evt::State { state, .. })))) = + tokio::time::timeout(tokio::time::Duration::from_millis(200), read_frame(&mut s)).await { + if state == spacesh_proto::status::SurfaceState::Done { saw_done = true; break; } + } + } + assert!(saw_done, "expected a Done state event from OSC 133"); + } } diff --git a/crates/spaceshd/src/surface.rs b/crates/spaceshd/src/surface.rs index b1a9620..a81e345 100644 --- a/crates/spaceshd/src/surface.rs +++ b/crates/spaceshd/src/surface.rs @@ -1,6 +1,8 @@ use spacesh_core::{snapshot::snapshot_ansi, GridSurface}; use spacesh_core::snapshot::Snapshot; +use spacesh_core::detect::{FallbackScanner, Osc133Scanner}; use spacesh_proto::{SurfaceId, WorkspaceId}; +use spacesh_proto::status::SurfaceState; use spacesh_proto::workspace::SurfaceSpec; use spacesh_pty::{PtyHandle, SpawnSpec}; use tokio::sync::{broadcast, mpsc, oneshot}; @@ -12,18 +14,23 @@ pub fn spawn_from_spec( id: SurfaceId, workspace_id: WorkspaceId, spec: &SurfaceSpec, + extra_env: Vec<(String, String)>, + hooks_active: bool, + state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, ) -> std::io::Result { + let mut env = vec![("SPACESH_SURFACE_ID".to_string(), id.0.clone())]; + env.extend(extra_env); let pty = PtyHandle::spawn(SpawnSpec { command: spec.command.clone(), args: spec.args.clone(), cwd: std::path::PathBuf::from(&spec.cwd), cols: spec.cols, rows: spec.rows, - env: vec![("SPACESH_SURFACE_ID".into(), id.0.clone())], + env, }) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; - Ok(spawn_surface(id, workspace_id, pty, spec.cols, spec.rows, exit_tx)) + Ok(spawn_surface(id, workspace_id, pty, spec.cols, spec.rows, hooks_active, state_tx, exit_tx)) } const BROADCAST_CAP: usize = 1024; @@ -52,18 +59,25 @@ pub fn spawn_surface( mut pty: PtyHandle, cols: u16, rows: u16, + hooks_active: bool, + state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, ) -> SurfaceHandle { let (tx, mut rx) = mpsc::channel::(64); let (bcast, _) = broadcast::channel::>(BROADCAST_CAP); let actor_id = id.clone(); + let detect_id = id.clone(); tokio::spawn(async move { let mut grid = GridSurface::new(cols, rows); let mut pending: Vec = Vec::with_capacity(FLUSH_BYTES); let mut flush_deadline: Option = None; + let mut osc = Osc133Scanner::new(); + // `deterministic` suppresses fallback once a reliable source is seen + // (hooks active, or any OSC 133 marker observed). + let mut deterministic = hooks_active; + let mut last_state = SurfaceState::Idle; - // Helper closure can't borrow across awaits cleanly; inline the flush logic. loop { // Copy the deadline into an owned local so the timer future doesn't // hold a borrow of `flush_deadline` across the select! (other arms mutate it). @@ -105,26 +119,18 @@ pub fn spawn_surface( flush_deadline = Some(Instant::now() + FLUSH_INTERVAL); } if pending.len() >= FLUSH_BYTES { - grid.feed(&pending); - let _ = bcast.send(std::mem::take(&mut pending)); + flush(&mut pending, &mut grid, &mut osc, &mut deterministic, &mut last_state, &detect_id, &bcast, &state_tx); flush_deadline = None; } } None => { - // Final flush on EOF. - if !pending.is_empty() { - grid.feed(&pending); - let _ = bcast.send(std::mem::take(&mut pending)); - } + flush(&mut pending, &mut grid, &mut osc, &mut deterministic, &mut last_state, &detect_id, &bcast, &state_tx); break; } } } _ = timer => { - if !pending.is_empty() { - grid.feed(&pending); - let _ = bcast.send(std::mem::take(&mut pending)); - } + flush(&mut pending, &mut grid, &mut osc, &mut deterministic, &mut last_state, &detect_id, &bcast, &state_tx); flush_deadline = None; } } @@ -136,6 +142,47 @@ pub fn spawn_surface( SurfaceHandle { id, workspace_id, tx } } +/// Feed pending bytes into the grid, run detectors, broadcast output, and emit a +/// state change (if any). No-op when pending is empty. +#[allow(clippy::too_many_arguments)] +fn flush( + pending: &mut Vec, + grid: &mut GridSurface, + osc: &mut Osc133Scanner, + deterministic: &mut bool, + last_state: &mut SurfaceState, + id: &SurfaceId, + bcast: &broadcast::Sender>, + state_tx: &mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, +) { + if pending.is_empty() { + return; + } + // Deterministic source: OSC 133 markers in this chunk. + // Emit each distinct state transition immediately so no marker is dropped + // when multiple arrive in a single flush (e.g. C + D in the same buffer). + let osc_states = osc.feed(&pending[..]); + let had_osc = !osc_states.is_empty(); + for st in osc_states { + *deterministic = true; + if st != *last_state { + *last_state = st; + let _ = state_tx.send((id.clone(), st)); + } + } + grid.feed(&pending[..]); + // Best-effort fallback only when no deterministic source is active. + if !had_osc && !*deterministic { + if let Some(st) = FallbackScanner::scan(&grid.tail_text(6)) { + if st != *last_state { + *last_state = st; + let _ = state_tx.send((id.clone(), st)); + } + } + } + let _ = bcast.send(std::mem::take(pending)); +} + #[cfg(test)] mod tests { use super::*; @@ -156,8 +203,9 @@ mod tests { async fn attach_receives_output() { let _serial = crate::test_support::serial(); let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap(); + let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); - let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx); + let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); let (reply_tx, reply_rx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap(); @@ -180,8 +228,9 @@ mod tests { async fn exit_is_reported() { let _serial = crate::test_support::serial(); let pty = PtyHandle::spawn(spec("exit 7")).unwrap(); + let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, mut exit_rx) = mpsc::unbounded_channel(); - let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx); + let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv()) .await.unwrap().unwrap(); assert_eq!(sid, SurfaceId("s_2".into())); @@ -192,8 +241,9 @@ mod tests { async fn attach_snapshot_reflects_prior_output() { let _serial = crate::test_support::serial(); let pty = PtyHandle::spawn(spec("printf SNAPME; sleep 0.5")).unwrap(); + let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); - let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx); + let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); // Give the child time to write and the actor time to flush into the grid. tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; @@ -213,8 +263,9 @@ mod tests { cwd: std::env::temp_dir().to_string_lossy().into(), agent_label: None, cols: 80, rows: 24, autostart: false, }; + let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, _rx) = mpsc::unbounded_channel(); - let handle = spawn_from_spec(SurfaceId("s_r".into()), WorkspaceId("w_1".into()), &spec, exit_tx).unwrap(); + let handle = spawn_from_spec(SurfaceId("s_r".into()), WorkspaceId("w_1".into()), &spec, vec![], false, state_tx, exit_tx).unwrap(); let (reply_tx, reply_rx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap(); let mut sub = reply_rx.await.unwrap(); @@ -228,4 +279,23 @@ mod tests { } assert!(got.contains("RESPAWN"), "got: {got:?}"); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn osc133_output_drives_state_detection() { + let _serial = crate::test_support::serial(); + let pty = PtyHandle::spawn(spec("printf '\\033]133;C\\007'; printf working; printf '\\033]133;D;0\\007'; sleep 0.3")).unwrap(); + let (state_tx, mut state_rx) = mpsc::unbounded_channel(); + let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); + let _h = spawn_surface(SurfaceId("s_o".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); + let mut seen = Vec::new(); + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2); + while tokio::time::Instant::now() < deadline { + if let Ok(Some((_, st))) = tokio::time::timeout(tokio::time::Duration::from_millis(100), state_rx.recv()).await { + seen.push(st); + if seen.contains(&SurfaceState::Done) { break; } + } + } + assert!(seen.contains(&SurfaceState::Work), "states: {seen:?}"); + assert!(seen.contains(&SurfaceState::Done), "states: {seen:?}"); + } }