feat(daemon): actor Snapshot message + dirty tracking + final snapshot on exit

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-15 15:47:40 +07:00
parent 1a7d04aab0
commit 0674872c9d
+80 -10
View File
@@ -7,6 +7,7 @@ use spacesh_proto::workspace::SurfaceSpec;
use spacesh_pty::{PtyHandle, SpawnSpec};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::time::{Duration, Instant};
use crate::snapshot_store::SnapshotMsg;
/// Spawn (or restart) a surface actor from a persisted spec. Injects
/// SPACESH_SURFACE_ID into the child env, mirroring `new_surface`.
@@ -24,6 +25,7 @@ pub fn spawn_from_spec(
hooks_active: bool,
state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
snapshot_tx: mpsc::UnboundedSender<SnapshotMsg>,
) -> std::io::Result<SurfaceHandle> {
let mut env = vec![("SPACESH_SURFACE_ID".to_string(), id.0.clone())];
env.extend(extra_env);
@@ -35,7 +37,7 @@ pub fn spawn_from_spec(
rows: spec.rows,
env,
};
Ok(spawn_surface_deferred(id, workspace_id, spawn_spec, spec.cols, spec.rows, hooks_active, state_tx, exit_tx))
Ok(spawn_surface_deferred(id, workspace_id, spawn_spec, spec.cols, spec.rows, hooks_active, state_tx, exit_tx, snapshot_tx))
}
const BROADCAST_CAP: usize = 1024;
@@ -53,6 +55,8 @@ pub enum SurfaceMsg {
Attach { reply: oneshot::Sender<broadcast::Receiver<Vec<u8>>> },
/// Attach with snapshot: subscribe AND capture the grid in one actor turn.
AttachSnapshot { reply: oneshot::Sender<(Snapshot, broadcast::Receiver<Vec<u8>>)> },
/// On-demand snapshot without subscribing; bool = dirty since last snapshot.
Snapshot { reply: oneshot::Sender<(Snapshot, bool)> },
Close,
}
@@ -76,10 +80,11 @@ pub fn spawn_surface(
hooks_active: bool,
state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
snapshot_tx: mpsc::UnboundedSender<SnapshotMsg>,
) -> SurfaceHandle {
let (tx, rx) = mpsc::channel::<SurfaceMsg>(64);
let (bcast, _) = broadcast::channel::<Vec<u8>>(BROADCAST_CAP);
tokio::spawn(run_actor(id.clone(), pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, Vec::new()));
tokio::spawn(run_actor(id.clone(), pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, snapshot_tx, Vec::new()));
SurfaceHandle { id, workspace_id, tx }
}
@@ -97,6 +102,7 @@ pub fn spawn_surface_deferred(
hooks_active: bool,
state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
snapshot_tx: mpsc::UnboundedSender<SnapshotMsg>,
) -> SurfaceHandle {
let (tx, mut rx) = mpsc::channel::<SurfaceMsg>(64);
let (bcast, _) = broadcast::channel::<Vec<u8>>(BROADCAST_CAP);
@@ -122,6 +128,10 @@ pub fn spawn_surface_deferred(
let snap = snapshot_ansi(&GridSurface::new(cols, rows));
let _ = reply.send((snap, sub));
}
Some(SurfaceMsg::Snapshot { reply }) => {
let snap = snapshot_ansi(&GridSurface::new(cols, rows));
let _ = reply.send((snap, false));
}
Some(SurfaceMsg::Close) | None => break false,
}
}
@@ -135,7 +145,7 @@ pub fn spawn_surface_deferred(
spec.cols = cols;
spec.rows = rows;
match PtyHandle::spawn(spec) {
Ok(pty) => run_actor(actor_id, pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, prebuf).await,
Ok(pty) => run_actor(actor_id, pty, cols, rows, hooks_active, bcast, rx, state_tx, exit_tx, snapshot_tx, prebuf).await,
Err(_) => { let _ = exit_tx.send((actor_id, 127)); }
}
});
@@ -156,6 +166,7 @@ async fn run_actor(
mut rx: mpsc::Receiver<SurfaceMsg>,
state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
snapshot_tx: mpsc::UnboundedSender<SnapshotMsg>,
prebuffered_input: Vec<u8>,
) {
let actor_id = id.clone();
@@ -173,6 +184,7 @@ async fn run_actor(
// (hooks active, or any OSC 133 marker observed).
let mut deterministic = hooks_active;
let mut last_state = SurfaceState::Idle;
let mut dirty = false;
loop {
// Copy the deadline into an owned local so the timer future doesn't
@@ -202,8 +214,15 @@ async fn run_actor(
// this snapshot. Broadcasting here would double-render on reattach.
let sub = bcast.subscribe();
let snap = snapshot_ansi(&grid);
dirty = false;
let _ = reply.send((snap, sub));
}
Some(SurfaceMsg::Snapshot { reply }) => {
let snap = snapshot_ansi(&grid);
let was_dirty = dirty;
dirty = false;
let _ = reply.send((snap, was_dirty));
}
Some(SurfaceMsg::Close) | None => { pty.kill(); break; }
}
}
@@ -211,6 +230,7 @@ async fn run_actor(
match chunk {
Some(bytes) => {
pending.extend_from_slice(&bytes);
dirty = true;
if flush_deadline.is_none() {
flush_deadline = Some(Instant::now() + FLUSH_INTERVAL);
}
@@ -233,6 +253,8 @@ async fn run_actor(
}
}
}
let final_snap = snapshot_ansi(&grid);
let _ = snapshot_tx.send(SnapshotMsg::Save(actor_id.clone(), final_snap));
let code = pty.wait();
let _ = exit_tx.send((actor_id, code));
}
@@ -307,7 +329,8 @@ mod tests {
let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap();
let (state_tx, _state_rx) = mpsc::unbounded_channel();
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx);
let (snap_tx, _snap_rx) = mpsc::unbounded_channel();
let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx);
let (reply_tx, reply_rx) = oneshot::channel();
handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap();
@@ -332,7 +355,8 @@ mod tests {
let pty = PtyHandle::spawn(spec("exit 7")).unwrap();
let (state_tx, _state_rx) = mpsc::unbounded_channel();
let (exit_tx, mut exit_rx) = mpsc::unbounded_channel();
let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx);
let (snap_tx, _snap_rx) = mpsc::unbounded_channel();
let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx);
let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv())
.await.unwrap().unwrap();
assert_eq!(sid, SurfaceId("s_2".into()));
@@ -345,7 +369,8 @@ mod tests {
let pty = PtyHandle::spawn(spec("printf SNAPME; sleep 0.5")).unwrap();
let (state_tx, _state_rx) = mpsc::unbounded_channel();
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx);
let (snap_tx, _snap_rx) = mpsc::unbounded_channel();
let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx);
// Give the child time to write and the actor time to flush into the grid.
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
@@ -367,7 +392,8 @@ mod tests {
};
let (state_tx, _state_rx) = mpsc::unbounded_channel();
let (exit_tx, _rx) = mpsc::unbounded_channel();
let handle = spawn_from_spec(SurfaceId("s_r".into()), WorkspaceId("w_1".into()), &spec, vec![], false, state_tx, exit_tx).unwrap();
let (snap_tx, _snap_rx) = mpsc::unbounded_channel();
let handle = spawn_from_spec(SurfaceId("s_r".into()), WorkspaceId("w_1".into()), &spec, vec![], false, state_tx, exit_tx, snap_tx).unwrap();
let (reply_tx, reply_rx) = oneshot::channel();
handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap();
let mut sub = reply_rx.await.unwrap();
@@ -412,7 +438,8 @@ mod tests {
let _serial = crate::test_support::serial();
let (state_tx, _s) = mpsc::unbounded_channel();
let (exit_tx, _e) = mpsc::unbounded_channel();
let handle = spawn_surface_deferred(SurfaceId("s_d".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx);
let (snap_tx, _snap_rx) = mpsc::unbounded_channel();
let handle = spawn_surface_deferred(SurfaceId("s_d".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx, snap_tx);
let (rtx, rrx) = oneshot::channel();
handle.tx.send(SurfaceMsg::Attach { reply: rtx }).await.unwrap();
@@ -429,7 +456,8 @@ mod tests {
let _serial = crate::test_support::serial();
let (state_tx, _s) = mpsc::unbounded_channel();
let (exit_tx, _e) = mpsc::unbounded_channel();
let handle = spawn_surface_deferred(SurfaceId("s_f".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx);
let (snap_tx, _snap_rx) = mpsc::unbounded_channel();
let handle = spawn_surface_deferred(SurfaceId("s_f".into()), WorkspaceId("w_1".into()), stty_spec(), 80, 24, false, state_tx, exit_tx, snap_tx);
let (rtx, rrx) = oneshot::channel();
handle.tx.send(SurfaceMsg::Attach { reply: rtx }).await.unwrap();
@@ -445,7 +473,8 @@ mod tests {
let pty = PtyHandle::spawn(spec("printf '\\033]133;C\\007'; printf working; printf '\\033]133;D;0\\007'; sleep 0.3")).unwrap();
let (state_tx, mut state_rx) = mpsc::unbounded_channel();
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
let _h = spawn_surface(SurfaceId("s_o".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx);
let (snap_tx, _snap_rx) = mpsc::unbounded_channel();
let _h = spawn_surface(SurfaceId("s_o".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx);
let mut seen = Vec::new();
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2);
while tokio::time::Instant::now() < deadline {
@@ -457,4 +486,45 @@ mod tests {
assert!(seen.contains(&SurfaceState::Work), "states: {seen:?}");
assert!(seen.contains(&SurfaceState::Done), "states: {seen:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_msg_returns_grid_and_tracks_dirty() {
let _serial = crate::test_support::serial();
let pty = PtyHandle::spawn(spec("printf DIRTYME; sleep 0.4")).unwrap();
let (state_tx, _s) = mpsc::unbounded_channel();
let (exit_tx, _e) = mpsc::unbounded_channel();
let (snap_tx, _snap_rx) = mpsc::unbounded_channel();
let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx);
tokio::time::sleep(Duration::from_millis(150)).await;
let (reply_tx, reply_rx) = oneshot::channel();
handle.tx.send(SurfaceMsg::Snapshot { reply: reply_tx }).await.unwrap();
let (snap, dirty) = reply_rx.await.unwrap();
assert!(snap.ansi.contains("DIRTYME"), "snapshot: {:?}", snap.ansi);
assert!(dirty, "first snapshot after output should be dirty");
let (reply_tx, reply_rx) = oneshot::channel();
handle.tx.send(SurfaceMsg::Snapshot { reply: reply_tx }).await.unwrap();
let (_snap2, dirty2) = reply_rx.await.unwrap();
assert!(!dirty2, "second snapshot with no new output should be clean");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn final_snapshot_sent_on_exit() {
let _serial = crate::test_support::serial();
let pty = PtyHandle::spawn(spec("printf BYE")).unwrap(); // exits immediately
let (state_tx, _s) = mpsc::unbounded_channel();
let (exit_tx, _e) = mpsc::unbounded_channel();
let (snap_tx, mut snap_rx) = mpsc::unbounded_channel();
let _handle = spawn_surface(SurfaceId("s_x".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx, snap_tx);
let msg = tokio::time::timeout(Duration::from_secs(2), snap_rx.recv()).await.unwrap().unwrap();
match msg {
crate::snapshot_store::SnapshotMsg::Save(sid, snap) => {
assert_eq!(sid.0, "s_x");
assert!(snap.ansi.contains("BYE"), "final snapshot: {:?}", snap.ansi);
}
_ => panic!("expected a Save message on exit"),
}
}
}