feat(daemon): grid feed + output coalescing + snapshot-on-attach (M1)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -183,7 +183,7 @@ async fn handle_request(
|
|||||||
};
|
};
|
||||||
match PtyHandle::spawn(spec) {
|
match PtyHandle::spawn(spec) {
|
||||||
Ok(pty) => {
|
Ok(pty) => {
|
||||||
let handle = spawn_surface(sid.clone(), workspace_id.clone(), pty, exit_tx.clone());
|
let handle = spawn_surface(sid.clone(), workspace_id.clone(), pty, cols, rows, exit_tx.clone());
|
||||||
// Bridge the surface's broadcast into the router as Output messages.
|
// Bridge the surface's broadcast into the router as Output messages.
|
||||||
spawn_output_bridge(sid.clone(), &handle, router_tx.clone());
|
spawn_output_bridge(sid.clone(), &handle, router_tx.clone());
|
||||||
reg.insert_surface(handle);
|
reg.insert_surface(handle);
|
||||||
@@ -219,10 +219,22 @@ async fn handle_request(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Cmd::Attach { surface_id } => {
|
Cmd::Attach { surface_id } => {
|
||||||
// M0 attach: register subscription, no snapshot yet (snapshot added in Task 13).
|
if let Some(s) = reg.surface(&surface_id) {
|
||||||
if reg.surface(&surface_id).is_some() {
|
let (reply_tx, reply_rx) = oneshot::channel();
|
||||||
|
if s.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.is_ok() {
|
||||||
|
if let Ok((snap, _sub)) = reply_rx.await {
|
||||||
subs.entry(surface_id.clone()).or_default().push(client);
|
subs.entry(surface_id.clone()).or_default().push(client);
|
||||||
let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0 }))).await;
|
let _ = out.send(ok(id, serde_json::json!({
|
||||||
|
"snapshot": snap.ansi,
|
||||||
|
"cols": snap.cols,
|
||||||
|
"rows": snap.rows,
|
||||||
|
"cursor_row": snap.cursor_row,
|
||||||
|
"cursor_col": snap.cursor_col,
|
||||||
|
}))).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let _ = out.send(err(id, "INTERNAL", "attach failed")).await;
|
||||||
} else {
|
} else {
|
||||||
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
let _ = out.send(err(id, "NOT_FOUND", "surface")).await;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,31 +1,35 @@
|
|||||||
|
use spacesh_core::{snapshot::snapshot_ansi, GridSurface};
|
||||||
|
use spacesh_core::snapshot::Snapshot;
|
||||||
use spacesh_proto::{SurfaceId, WorkspaceId};
|
use spacesh_proto::{SurfaceId, WorkspaceId};
|
||||||
use spacesh_pty::PtyHandle;
|
use spacesh_pty::PtyHandle;
|
||||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||||
|
use tokio::time::{Duration, Instant};
|
||||||
|
|
||||||
/// Output broadcast capacity (chunks). Lagging subscribers drop intermediate frames.
|
|
||||||
const BROADCAST_CAP: usize = 1024;
|
const BROADCAST_CAP: usize = 1024;
|
||||||
|
const FLUSH_INTERVAL: Duration = Duration::from_millis(6);
|
||||||
|
const FLUSH_BYTES: usize = 16 * 1024;
|
||||||
|
|
||||||
/// Messages sent to a surface actor.
|
|
||||||
pub enum SurfaceMsg {
|
pub enum SurfaceMsg {
|
||||||
Input(Vec<u8>),
|
Input(Vec<u8>),
|
||||||
Resize { cols: u16, rows: u16 },
|
Resize { cols: u16, rows: u16 },
|
||||||
/// Subscribe to the output stream. Reply carries a fresh receiver.
|
|
||||||
Attach { reply: oneshot::Sender<broadcast::Receiver<Vec<u8>>> },
|
Attach { reply: oneshot::Sender<broadcast::Receiver<Vec<u8>>> },
|
||||||
|
/// Attach with snapshot: subscribe AND capture the grid in one actor turn.
|
||||||
|
AttachSnapshot { reply: oneshot::Sender<(Snapshot, broadcast::Receiver<Vec<u8>>)> },
|
||||||
Close,
|
Close,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle the daemon keeps for a live surface.
|
|
||||||
pub struct SurfaceHandle {
|
pub struct SurfaceHandle {
|
||||||
pub id: SurfaceId,
|
pub id: SurfaceId,
|
||||||
pub workspace_id: WorkspaceId,
|
pub workspace_id: WorkspaceId,
|
||||||
pub tx: mpsc::Sender<SurfaceMsg>,
|
pub tx: mpsc::Sender<SurfaceMsg>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn the actor; returns its handle. `exit_tx` is fired once with the exit code.
|
|
||||||
pub fn spawn_surface(
|
pub fn spawn_surface(
|
||||||
id: SurfaceId,
|
id: SurfaceId,
|
||||||
workspace_id: WorkspaceId,
|
workspace_id: WorkspaceId,
|
||||||
mut pty: PtyHandle,
|
mut pty: PtyHandle,
|
||||||
|
cols: u16,
|
||||||
|
rows: u16,
|
||||||
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
|
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
|
||||||
) -> SurfaceHandle {
|
) -> SurfaceHandle {
|
||||||
let (tx, mut rx) = mpsc::channel::<SurfaceMsg>(64);
|
let (tx, mut rx) = mpsc::channel::<SurfaceMsg>(64);
|
||||||
@@ -33,21 +37,75 @@ pub fn spawn_surface(
|
|||||||
let actor_id = id.clone();
|
let actor_id = id.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
let mut grid = GridSurface::new(cols, rows);
|
||||||
|
let mut pending: Vec<u8> = Vec::with_capacity(FLUSH_BYTES);
|
||||||
|
let mut flush_deadline: Option<Instant> = None;
|
||||||
|
|
||||||
|
// Helper closure can't borrow across awaits cleanly; inline the flush logic.
|
||||||
loop {
|
loop {
|
||||||
|
// Copy the deadline into an owned local so the timer future doesn't
|
||||||
|
// hold a borrow of `flush_deadline` across the select! (other arms mutate it).
|
||||||
|
let next_flush = flush_deadline;
|
||||||
|
let timer = async move {
|
||||||
|
match next_flush {
|
||||||
|
Some(d) => tokio::time::sleep_until(d).await,
|
||||||
|
None => std::future::pending::<()>().await,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
msg = rx.recv() => {
|
msg = rx.recv() => {
|
||||||
match msg {
|
match msg {
|
||||||
Some(SurfaceMsg::Input(bytes)) => { let _ = pty.write_input(&bytes); }
|
Some(SurfaceMsg::Input(bytes)) => { let _ = pty.write_input(&bytes); }
|
||||||
Some(SurfaceMsg::Resize { cols, rows }) => { let _ = pty.resize(cols, rows); }
|
Some(SurfaceMsg::Resize { cols, rows }) => {
|
||||||
|
grid.resize(cols, rows);
|
||||||
|
let _ = pty.resize(cols, rows);
|
||||||
|
}
|
||||||
Some(SurfaceMsg::Attach { reply }) => { let _ = reply.send(bcast.subscribe()); }
|
Some(SurfaceMsg::Attach { reply }) => { let _ = reply.send(bcast.subscribe()); }
|
||||||
|
Some(SurfaceMsg::AttachSnapshot { reply }) => {
|
||||||
|
// Flush pending into the grid first so the snapshot is current,
|
||||||
|
// but DO NOT broadcast here; subscribe before any further output.
|
||||||
|
if !pending.is_empty() {
|
||||||
|
grid.feed(&pending);
|
||||||
|
let _ = bcast.send(std::mem::take(&mut pending));
|
||||||
|
flush_deadline = None;
|
||||||
|
}
|
||||||
|
let sub = bcast.subscribe();
|
||||||
|
let snap = snapshot_ansi(&grid);
|
||||||
|
let _ = reply.send((snap, sub));
|
||||||
|
}
|
||||||
Some(SurfaceMsg::Close) | None => { pty.kill(); break; }
|
Some(SurfaceMsg::Close) | None => { pty.kill(); break; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
chunk = pty.output.recv() => {
|
chunk = pty.output.recv() => {
|
||||||
match chunk {
|
match chunk {
|
||||||
Some(bytes) => { let _ = bcast.send(bytes); }
|
Some(bytes) => {
|
||||||
None => { break; } // PTY EOF
|
pending.extend_from_slice(&bytes);
|
||||||
|
if flush_deadline.is_none() {
|
||||||
|
flush_deadline = Some(Instant::now() + FLUSH_INTERVAL);
|
||||||
}
|
}
|
||||||
|
if pending.len() >= FLUSH_BYTES {
|
||||||
|
grid.feed(&pending);
|
||||||
|
let _ = bcast.send(std::mem::take(&mut pending));
|
||||||
|
flush_deadline = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Final flush on EOF.
|
||||||
|
if !pending.is_empty() {
|
||||||
|
grid.feed(&pending);
|
||||||
|
let _ = bcast.send(std::mem::take(&mut pending));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = timer => {
|
||||||
|
if !pending.is_empty() {
|
||||||
|
grid.feed(&pending);
|
||||||
|
let _ = bcast.send(std::mem::take(&mut pending));
|
||||||
|
}
|
||||||
|
flush_deadline = None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -78,7 +136,7 @@ mod tests {
|
|||||||
async fn attach_receives_output() {
|
async fn attach_receives_output() {
|
||||||
let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap();
|
let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap();
|
||||||
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
|
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
|
||||||
let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, exit_tx);
|
let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx);
|
||||||
|
|
||||||
let (reply_tx, reply_rx) = oneshot::channel();
|
let (reply_tx, reply_rx) = oneshot::channel();
|
||||||
handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap();
|
handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap();
|
||||||
@@ -101,10 +159,25 @@ mod tests {
|
|||||||
async fn exit_is_reported() {
|
async fn exit_is_reported() {
|
||||||
let pty = PtyHandle::spawn(spec("exit 7")).unwrap();
|
let pty = PtyHandle::spawn(spec("exit 7")).unwrap();
|
||||||
let (exit_tx, mut exit_rx) = mpsc::unbounded_channel();
|
let (exit_tx, mut exit_rx) = mpsc::unbounded_channel();
|
||||||
let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, exit_tx);
|
let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx);
|
||||||
let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv())
|
let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv())
|
||||||
.await.unwrap().unwrap();
|
.await.unwrap().unwrap();
|
||||||
assert_eq!(sid, SurfaceId("s_2".into()));
|
assert_eq!(sid, SurfaceId("s_2".into()));
|
||||||
assert_eq!(code, 7);
|
assert_eq!(code, 7);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn attach_snapshot_reflects_prior_output() {
|
||||||
|
let pty = PtyHandle::spawn(spec("printf SNAPME; sleep 0.5")).unwrap();
|
||||||
|
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
|
||||||
|
let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx);
|
||||||
|
|
||||||
|
// Give the child time to write and the actor time to flush into the grid.
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||||
|
|
||||||
|
let (reply_tx, reply_rx) = oneshot::channel();
|
||||||
|
handle.tx.send(SurfaceMsg::AttachSnapshot { reply: reply_tx }).await.unwrap();
|
||||||
|
let (snap, _sub) = reply_rx.await.unwrap();
|
||||||
|
assert!(snap.ansi.contains("SNAPME"), "snapshot: {:?}", snap.ansi);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user