From 0674872c9d08b97f5f41e2c3f9302441ad99f513 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Mon, 15 Jun 2026 15:47:40 +0700 Subject: [PATCH] feat(daemon): actor Snapshot message + dirty tracking + final snapshot on exit Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/spaceshd/src/surface.rs | 90 ++++++++++++++++++++++++++++++---- 1 file changed, 80 insertions(+), 10 deletions(-) diff --git a/crates/spaceshd/src/surface.rs b/crates/spaceshd/src/surface.rs index 8ce5c15..6594a98 100644 --- a/crates/spaceshd/src/surface.rs +++ b/crates/spaceshd/src/surface.rs @@ -7,6 +7,7 @@ use spacesh_proto::workspace::SurfaceSpec; use spacesh_pty::{PtyHandle, SpawnSpec}; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{Duration, Instant}; +use crate::snapshot_store::SnapshotMsg; /// Spawn (or restart) a surface actor from a persisted spec. Injects /// SPACESH_SURFACE_ID into the child env, mirroring `new_surface`. @@ -24,6 +25,7 @@ pub fn spawn_from_spec( hooks_active: bool, state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, + snapshot_tx: mpsc::UnboundedSender, ) -> std::io::Result { let mut env = vec![("SPACESH_SURFACE_ID".to_string(), id.0.clone())]; env.extend(extra_env); @@ -35,7 +37,7 @@ pub fn spawn_from_spec( rows: spec.rows, env, }; - Ok(spawn_surface_deferred(id, workspace_id, spawn_spec, spec.cols, spec.rows, hooks_active, state_tx, exit_tx)) + Ok(spawn_surface_deferred(id, workspace_id, spawn_spec, spec.cols, spec.rows, hooks_active, state_tx, exit_tx, snapshot_tx)) } const BROADCAST_CAP: usize = 1024; @@ -53,6 +55,8 @@ pub enum SurfaceMsg { Attach { reply: oneshot::Sender>> }, /// Attach with snapshot: subscribe AND capture the grid in one actor turn. AttachSnapshot { reply: oneshot::Sender<(Snapshot, broadcast::Receiver>)> }, + /// On-demand snapshot without subscribing; bool = dirty since last snapshot. + Snapshot { reply: oneshot::Sender<(Snapshot, bool)> }, Close, } @@ -76,10 +80,11 @@ pub fn spawn_surface( hooks_active: bool, state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, + snapshot_tx: mpsc::UnboundedSender, ) -> SurfaceHandle { let (tx, rx) = mpsc::channel::(64); let (bcast, _) = broadcast::channel::>(BROADCAST_CAP); - tokio::spawn(run_actor(id.clone(), pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, Vec::new())); + tokio::spawn(run_actor(id.clone(), pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, snapshot_tx, Vec::new())); SurfaceHandle { id, workspace_id, tx } } @@ -97,6 +102,7 @@ pub fn spawn_surface_deferred( hooks_active: bool, state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, + snapshot_tx: mpsc::UnboundedSender, ) -> SurfaceHandle { let (tx, mut rx) = mpsc::channel::(64); let (bcast, _) = broadcast::channel::>(BROADCAST_CAP); @@ -122,6 +128,10 @@ pub fn spawn_surface_deferred( let snap = snapshot_ansi(&GridSurface::new(cols, rows)); let _ = reply.send((snap, sub)); } + Some(SurfaceMsg::Snapshot { reply }) => { + let snap = snapshot_ansi(&GridSurface::new(cols, rows)); + let _ = reply.send((snap, false)); + } Some(SurfaceMsg::Close) | None => break false, } } @@ -135,7 +145,7 @@ pub fn spawn_surface_deferred( spec.cols = cols; spec.rows = rows; match PtyHandle::spawn(spec) { - Ok(pty) => run_actor(actor_id, pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, prebuf).await, + Ok(pty) => run_actor(actor_id, pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, snapshot_tx, prebuf).await, Err(_) => { let _ = exit_tx.send((actor_id, 127)); } } }); @@ -156,6 +166,7 @@ async fn run_actor( mut rx: mpsc::Receiver, state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>, exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>, + snapshot_tx: mpsc::UnboundedSender, prebuffered_input: Vec, ) { let actor_id = id.clone(); @@ -173,6 +184,7 @@ async fn run_actor( // (hooks active, or any OSC 133 marker observed). let mut deterministic = hooks_active; let mut last_state = SurfaceState::Idle; + let mut dirty = false; loop { // Copy the deadline into an owned local so the timer future doesn't @@ -202,8 +214,15 @@ async fn run_actor( // this snapshot. Broadcasting here would double-render on reattach. let sub = bcast.subscribe(); let snap = snapshot_ansi(&grid); + dirty = false; let _ = reply.send((snap, sub)); } + Some(SurfaceMsg::Snapshot { reply }) => { + let snap = snapshot_ansi(&grid); + let was_dirty = dirty; + dirty = false; + let _ = reply.send((snap, was_dirty)); + } Some(SurfaceMsg::Close) | None => { pty.kill(); break; } } } @@ -211,6 +230,7 @@ async fn run_actor( match chunk { Some(bytes) => { pending.extend_from_slice(&bytes); + dirty = true; if flush_deadline.is_none() { flush_deadline = Some(Instant::now() + FLUSH_INTERVAL); } @@ -233,6 +253,8 @@ async fn run_actor( } } } + let final_snap = snapshot_ansi(&grid); + let _ = snapshot_tx.send(SnapshotMsg::Save(actor_id.clone(), final_snap)); let code = pty.wait(); let _ = exit_tx.send((actor_id, code)); } @@ -307,7 +329,8 @@ mod tests { let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap(); let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); - let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); + let (snap_tx, _snap_rx) = mpsc::unbounded_channel(); + let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx); let (reply_tx, reply_rx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap(); @@ -332,7 +355,8 @@ mod tests { let pty = PtyHandle::spawn(spec("exit 7")).unwrap(); let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, mut exit_rx) = mpsc::unbounded_channel(); - let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); + let (snap_tx, _snap_rx) = mpsc::unbounded_channel(); + let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx); let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv()) .await.unwrap().unwrap(); assert_eq!(sid, SurfaceId("s_2".into())); @@ -345,7 +369,8 @@ mod tests { let pty = PtyHandle::spawn(spec("printf SNAPME; sleep 0.5")).unwrap(); let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); - let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); + let (snap_tx, _snap_rx) = mpsc::unbounded_channel(); + let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx); // Give the child time to write and the actor time to flush into the grid. tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; @@ -367,7 +392,8 @@ mod tests { }; let (state_tx, _state_rx) = mpsc::unbounded_channel(); let (exit_tx, _rx) = mpsc::unbounded_channel(); - let handle = spawn_from_spec(SurfaceId("s_r".into()), WorkspaceId("w_1".into()), &spec, vec![], false, state_tx, exit_tx).unwrap(); + let (snap_tx, _snap_rx) = mpsc::unbounded_channel(); + let handle = spawn_from_spec(SurfaceId("s_r".into()), WorkspaceId("w_1".into()), &spec, vec![], false, state_tx, exit_tx, snap_tx).unwrap(); let (reply_tx, reply_rx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap(); let mut sub = reply_rx.await.unwrap(); @@ -412,7 +438,8 @@ mod tests { let _serial = crate::test_support::serial(); let (state_tx, _s) = mpsc::unbounded_channel(); let (exit_tx, _e) = mpsc::unbounded_channel(); - let handle = spawn_surface_deferred(SurfaceId("s_d".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx); + let (snap_tx, _snap_rx) = mpsc::unbounded_channel(); + let handle = spawn_surface_deferred(SurfaceId("s_d".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx, snap_tx); let (rtx, rrx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: rtx }).await.unwrap(); @@ -429,7 +456,8 @@ mod tests { let _serial = crate::test_support::serial(); let (state_tx, _s) = mpsc::unbounded_channel(); let (exit_tx, _e) = mpsc::unbounded_channel(); - let handle = spawn_surface_deferred(SurfaceId("s_f".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx); + let (snap_tx, _snap_rx) = mpsc::unbounded_channel(); + let handle = spawn_surface_deferred(SurfaceId("s_f".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx, snap_tx); let (rtx, rrx) = oneshot::channel(); handle.tx.send(SurfaceMsg::Attach { reply: rtx }).await.unwrap(); @@ -445,7 +473,8 @@ mod tests { let pty = PtyHandle::spawn(spec("printf '\\033]133;C\\007'; printf working; printf '\\033]133;D;0\\007'; sleep 0.3")).unwrap(); let (state_tx, mut state_rx) = mpsc::unbounded_channel(); let (exit_tx, _exit_rx) = mpsc::unbounded_channel(); - let _h = spawn_surface(SurfaceId("s_o".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx); + let (snap_tx, _snap_rx) = mpsc::unbounded_channel(); + let _h = spawn_surface(SurfaceId("s_o".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx); let mut seen = Vec::new(); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2); while tokio::time::Instant::now() < deadline { @@ -457,4 +486,45 @@ mod tests { assert!(seen.contains(&SurfaceState::Work), "states: {seen:?}"); assert!(seen.contains(&SurfaceState::Done), "states: {seen:?}"); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn snapshot_msg_returns_grid_and_tracks_dirty() { + let _serial = crate::test_support::serial(); + let pty = PtyHandle::spawn(spec("printf DIRTYME; sleep 0.4")).unwrap(); + let (state_tx, _s) = mpsc::unbounded_channel(); + let (exit_tx, _e) = mpsc::unbounded_channel(); + let (snap_tx, _snap_rx) = mpsc::unbounded_channel(); + let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx); + + tokio::time::sleep(Duration::from_millis(150)).await; + let (reply_tx, reply_rx) = oneshot::channel(); + handle.tx.send(SurfaceMsg::Snapshot { reply: reply_tx }).await.unwrap(); + let (snap, dirty) = reply_rx.await.unwrap(); + assert!(snap.ansi.contains("DIRTYME"), "snapshot: {:?}", snap.ansi); + assert!(dirty, "first snapshot after output should be dirty"); + + let (reply_tx, reply_rx) = oneshot::channel(); + handle.tx.send(SurfaceMsg::Snapshot { reply: reply_tx }).await.unwrap(); + let (_snap2, dirty2) = reply_rx.await.unwrap(); + assert!(!dirty2, "second snapshot with no new output should be clean"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn final_snapshot_sent_on_exit() { + let _serial = crate::test_support::serial(); + let pty = PtyHandle::spawn(spec("printf BYE")).unwrap(); // exits immediately + let (state_tx, _s) = mpsc::unbounded_channel(); + let (exit_tx, _e) = mpsc::unbounded_channel(); + let (snap_tx, mut snap_rx) = mpsc::unbounded_channel(); + let _handle = spawn_surface(SurfaceId("s_x".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx); + + let msg = tokio::time::timeout(Duration::from_secs(2), snap_rx.recv()).await.unwrap().unwrap(); + match msg { + crate::snapshot_store::SnapshotMsg::Save(sid, snap) => { + assert_eq!(sid.0, "s_x"); + assert!(snap.ansi.contains("BYE"), "final snapshot: {:?}", snap.ansi); + } + _ => panic!("expected a Save message on exit"), + } + } }