feat(daemon): actor OSC133/fallback detection → set_state, hook/shell spawn env, cleanup

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 23:05:18 +07:00
parent 4396ad7909
commit c35585755e
2 changed files with 170 additions and 25 deletions
+88 -18
View File
@@ -1,6 +1,8 @@
use spacesh_core::{snapshot::snapshot_ansi, GridSurface};
use spacesh_core::snapshot::Snapshot;
use spacesh_core::detect::{FallbackScanner, Osc133Scanner};
use spacesh_proto::{SurfaceId, WorkspaceId};
use spacesh_proto::status::SurfaceState;
use spacesh_proto::workspace::SurfaceSpec;
use spacesh_pty::{PtyHandle, SpawnSpec};
use tokio::sync::{broadcast, mpsc, oneshot};
@@ -12,18 +14,23 @@ pub fn spawn_from_spec(
id: SurfaceId,
workspace_id: WorkspaceId,
spec: &SurfaceSpec,
extra_env: Vec<(String, String)>,
hooks_active: bool,
state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
) -> std::io::Result<SurfaceHandle> {
let mut env = vec![("SPACESH_SURFACE_ID".to_string(), id.0.clone())];
env.extend(extra_env);
let pty = PtyHandle::spawn(SpawnSpec {
command: spec.command.clone(),
args: spec.args.clone(),
cwd: std::path::PathBuf::from(&spec.cwd),
cols: spec.cols,
rows: spec.rows,
env: vec![("SPACESH_SURFACE_ID".into(), id.0.clone())],
env,
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
Ok(spawn_surface(id, workspace_id, pty, spec.cols, spec.rows, exit_tx))
Ok(spawn_surface(id, workspace_id, pty, spec.cols, spec.rows, hooks_active, state_tx, exit_tx))
}
const BROADCAST_CAP: usize = 1024;
@@ -52,18 +59,25 @@ pub fn spawn_surface(
mut pty: PtyHandle,
cols: u16,
rows: u16,
hooks_active: bool,
state_tx: mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
exit_tx: mpsc::UnboundedSender<(SurfaceId, i32)>,
) -> SurfaceHandle {
let (tx, mut rx) = mpsc::channel::<SurfaceMsg>(64);
let (bcast, _) = broadcast::channel::<Vec<u8>>(BROADCAST_CAP);
let actor_id = id.clone();
let detect_id = id.clone();
tokio::spawn(async move {
let mut grid = GridSurface::new(cols, rows);
let mut pending: Vec<u8> = Vec::with_capacity(FLUSH_BYTES);
let mut flush_deadline: Option<Instant> = None;
let mut osc = Osc133Scanner::new();
// `deterministic` suppresses fallback once a reliable source is seen
// (hooks active, or any OSC 133 marker observed).
let mut deterministic = hooks_active;
let mut last_state = SurfaceState::Idle;
// Helper closure can't borrow across awaits cleanly; inline the flush logic.
loop {
// Copy the deadline into an owned local so the timer future doesn't
// hold a borrow of `flush_deadline` across the select! (other arms mutate it).
@@ -105,26 +119,18 @@ pub fn spawn_surface(
flush_deadline = Some(Instant::now() + FLUSH_INTERVAL);
}
if pending.len() >= FLUSH_BYTES {
grid.feed(&pending);
let _ = bcast.send(std::mem::take(&mut pending));
flush(&mut pending, &mut grid, &mut osc, &mut deterministic, &mut last_state, &detect_id, &bcast, &state_tx);
flush_deadline = None;
}
}
None => {
// Final flush on EOF.
if !pending.is_empty() {
grid.feed(&pending);
let _ = bcast.send(std::mem::take(&mut pending));
}
flush(&mut pending, &mut grid, &mut osc, &mut deterministic, &mut last_state, &detect_id, &bcast, &state_tx);
break;
}
}
}
_ = timer => {
if !pending.is_empty() {
grid.feed(&pending);
let _ = bcast.send(std::mem::take(&mut pending));
}
flush(&mut pending, &mut grid, &mut osc, &mut deterministic, &mut last_state, &detect_id, &bcast, &state_tx);
flush_deadline = None;
}
}
@@ -136,6 +142,47 @@ pub fn spawn_surface(
SurfaceHandle { id, workspace_id, tx }
}
/// Feed pending bytes into the grid, run detectors, broadcast output, and emit a
/// state change (if any). No-op when pending is empty.
#[allow(clippy::too_many_arguments)]
fn flush(
pending: &mut Vec<u8>,
grid: &mut GridSurface,
osc: &mut Osc133Scanner,
deterministic: &mut bool,
last_state: &mut SurfaceState,
id: &SurfaceId,
bcast: &broadcast::Sender<Vec<u8>>,
state_tx: &mpsc::UnboundedSender<(SurfaceId, SurfaceState)>,
) {
if pending.is_empty() {
return;
}
// Deterministic source: OSC 133 markers in this chunk.
// Emit each distinct state transition immediately so no marker is dropped
// when multiple arrive in a single flush (e.g. C + D in the same buffer).
let osc_states = osc.feed(&pending[..]);
let had_osc = !osc_states.is_empty();
for st in osc_states {
*deterministic = true;
if st != *last_state {
*last_state = st;
let _ = state_tx.send((id.clone(), st));
}
}
grid.feed(&pending[..]);
// Best-effort fallback only when no deterministic source is active.
if !had_osc && !*deterministic {
if let Some(st) = FallbackScanner::scan(&grid.tail_text(6)) {
if st != *last_state {
*last_state = st;
let _ = state_tx.send((id.clone(), st));
}
}
}
let _ = bcast.send(std::mem::take(pending));
}
#[cfg(test)]
mod tests {
use super::*;
@@ -156,8 +203,9 @@ mod tests {
async fn attach_receives_output() {
let _serial = crate::test_support::serial();
let pty = PtyHandle::spawn(spec("printf HELLO; sleep 0.3")).unwrap();
let (state_tx, _state_rx) = mpsc::unbounded_channel();
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx);
let handle = spawn_surface(SurfaceId("s_1".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx);
let (reply_tx, reply_rx) = oneshot::channel();
handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap();
@@ -180,8 +228,9 @@ mod tests {
async fn exit_is_reported() {
let _serial = crate::test_support::serial();
let pty = PtyHandle::spawn(spec("exit 7")).unwrap();
let (state_tx, _state_rx) = mpsc::unbounded_channel();
let (exit_tx, mut exit_rx) = mpsc::unbounded_channel();
let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx);
let _handle = spawn_surface(SurfaceId("s_2".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx);
let (sid, code) = tokio::time::timeout(tokio::time::Duration::from_secs(3), exit_rx.recv())
.await.unwrap().unwrap();
assert_eq!(sid, SurfaceId("s_2".into()));
@@ -192,8 +241,9 @@ mod tests {
async fn attach_snapshot_reflects_prior_output() {
let _serial = crate::test_support::serial();
let pty = PtyHandle::spawn(spec("printf SNAPME; sleep 0.5")).unwrap();
let (state_tx, _state_rx) = mpsc::unbounded_channel();
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, exit_tx);
let handle = spawn_surface(SurfaceId("s_s".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx);
// Give the child time to write and the actor time to flush into the grid.
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
@@ -213,8 +263,9 @@ mod tests {
cwd: std::env::temp_dir().to_string_lossy().into(),
agent_label: None, cols: 80, rows: 24, autostart: false,
};
let (state_tx, _state_rx) = mpsc::unbounded_channel();
let (exit_tx, _rx) = mpsc::unbounded_channel();
let handle = spawn_from_spec(SurfaceId("s_r".into()), WorkspaceId("w_1".into()), &spec, exit_tx).unwrap();
let handle = spawn_from_spec(SurfaceId("s_r".into()), WorkspaceId("w_1".into()), &spec, vec![], false, state_tx, exit_tx).unwrap();
let (reply_tx, reply_rx) = oneshot::channel();
handle.tx.send(SurfaceMsg::Attach { reply: reply_tx }).await.unwrap();
let mut sub = reply_rx.await.unwrap();
@@ -228,4 +279,23 @@ mod tests {
}
assert!(got.contains("RESPAWN"), "got: {got:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn osc133_output_drives_state_detection() {
let _serial = crate::test_support::serial();
let pty = PtyHandle::spawn(spec("printf '\\033]133;C\\007'; printf working; printf '\\033]133;D;0\\007'; sleep 0.3")).unwrap();
let (state_tx, mut state_rx) = mpsc::unbounded_channel();
let (exit_tx, _exit_rx) = mpsc::unbounded_channel();
let _h = spawn_surface(SurfaceId("s_o".into()), WorkspaceId("w_1".into()), pty, 80, 24, false, state_tx, exit_tx);
let mut seen = Vec::new();
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(2);
while tokio::time::Instant::now() < deadline {
if let Ok(Some((_, st))) = tokio::time::timeout(tokio::time::Duration::from_millis(100), state_rx.recv()).await {
seen.push(st);
if seen.contains(&SurfaceState::Done) { break; }
}
}
assert!(seen.contains(&SurfaceState::Work), "states: {seen:?}");
assert!(seen.contains(&SurfaceState::Done), "states: {seen:?}");
}
}