feat(daemon): snapshot ticker + writer wiring + stopped-attach reads disk + cleanup on close
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -61,6 +61,9 @@ async fn run_daemon() -> Result<()> {
|
|||||||
let events_path = lifecycle::spacesh_dir()?.join("events.json");
|
let events_path = lifecycle::spacesh_dir()?.join("events.json");
|
||||||
let event_store: std::sync::Arc<dyn event_store::EventStore> =
|
let event_store: std::sync::Arc<dyn event_store::EventStore> =
|
||||||
std::sync::Arc::new(event_store::JsonEventStore::new(events_path));
|
std::sync::Arc::new(event_store::JsonEventStore::new(events_path));
|
||||||
|
let snapshots_dir = lifecycle::spacesh_dir()?.join("snapshots");
|
||||||
|
let snapshot_store: std::sync::Arc<dyn snapshot_store::SnapshotStore> =
|
||||||
|
std::sync::Arc::new(snapshot_store::JsonSnapshotStore::new(snapshots_dir));
|
||||||
eprintln!("spaceshd listening on {}", sock.display());
|
eprintln!("spaceshd listening on {}", sock.display());
|
||||||
server::serve(&sock, store, event_store).await
|
server::serve(&sock, store, event_store, snapshot_store).await
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -114,6 +114,10 @@ impl Registry {
|
|||||||
pub fn is_running(&self, sid: &SurfaceId) -> bool {
|
pub fn is_running(&self, sid: &SurfaceId) -> bool {
|
||||||
self.live.contains_key(sid)
|
self.live.contains_key(sid)
|
||||||
}
|
}
|
||||||
|
/// Ids of all currently-live surfaces.
|
||||||
|
pub fn live_ids(&self) -> Vec<SurfaceId> {
|
||||||
|
self.live.keys().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
// ---- surface state ----
|
// ---- surface state ----
|
||||||
|
|
||||||
|
|||||||
+122
-26
@@ -13,6 +13,7 @@ use crate::event_log::EventLog;
|
|||||||
use crate::event_store::{self, EventPersister, EventStore};
|
use crate::event_store::{self, EventPersister, EventStore};
|
||||||
use crate::persist::{self, Persister};
|
use crate::persist::{self, Persister};
|
||||||
use crate::registry::Registry;
|
use crate::registry::Registry;
|
||||||
|
use crate::snapshot_store::{SnapshotStore, SnapshotMsg, spawn_writer};
|
||||||
use crate::state_store::StateStore;
|
use crate::state_store::StateStore;
|
||||||
use crate::surface::{SurfaceMsg};
|
use crate::surface::{SurfaceMsg};
|
||||||
|
|
||||||
@@ -33,11 +34,13 @@ enum ServerMsg {
|
|||||||
ClientDisconnected { client: ClientId },
|
ClientDisconnected { client: ClientId },
|
||||||
/// A status change detected internally (OSC 133 / fallback) by a surface actor.
|
/// A status change detected internally (OSC 133 / fallback) by a surface actor.
|
||||||
StateDetected { surface_id: SurfaceId, state: SurfaceState },
|
StateDetected { surface_id: SurfaceId, state: SurfaceState },
|
||||||
|
/// Periodic snapshot tick: ask all live surfaces for a snapshot and persist dirty ones.
|
||||||
|
SnapshotTick,
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientId = u64;
|
type ClientId = u64;
|
||||||
|
|
||||||
pub async fn serve(socket: &Path, store: Arc<dyn StateStore>, event_store: Arc<dyn EventStore>) -> Result<()> {
|
pub async fn serve(socket: &Path, store: Arc<dyn StateStore>, event_store: Arc<dyn EventStore>, snapshot_store: Arc<dyn SnapshotStore>) -> Result<()> {
|
||||||
let listener = UnixListener::bind(socket)?;
|
let listener = UnixListener::bind(socket)?;
|
||||||
let (router_tx, router_rx) = mpsc::channel::<ServerMsg>(256);
|
let (router_tx, router_rx) = mpsc::channel::<ServerMsg>(256);
|
||||||
|
|
||||||
@@ -58,6 +61,20 @@ pub async fn serve(socket: &Path, store: Arc<dyn StateStore>, event_store: Arc<d
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let snapshot_tx = spawn_writer(snapshot_store.clone());
|
||||||
|
|
||||||
|
// Periodic snapshot tick → router.
|
||||||
|
let tick_router = router_tx.clone();
|
||||||
|
let interval_secs = crate::config::Config::load().snapshot_interval_secs();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut tick = tokio::time::interval(Duration::from_secs(interval_secs));
|
||||||
|
tick.tick().await; // consume the immediate first tick
|
||||||
|
loop {
|
||||||
|
tick.tick().await;
|
||||||
|
if tick_router.send(ServerMsg::SnapshotTick).await.is_err() { break; }
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let persister = persist::spawn(store.clone(), Duration::from_millis(500));
|
let persister = persist::spawn(store.clone(), Duration::from_millis(500));
|
||||||
let initial = store.load().unwrap_or_default();
|
let initial = store.load().unwrap_or_default();
|
||||||
let event_persister = event_store::spawn(event_store.clone(), Duration::from_millis(500));
|
let event_persister = event_store::spawn(event_store.clone(), Duration::from_millis(500));
|
||||||
@@ -66,7 +83,7 @@ pub async fn serve(socket: &Path, store: Arc<dyn StateStore>, event_store: Arc<d
|
|||||||
let shutdown = tokio::spawn(router(
|
let shutdown = tokio::spawn(router(
|
||||||
router_rx, router_tx.clone(), exit_tx, state_tx,
|
router_rx, router_tx.clone(), exit_tx, state_tx,
|
||||||
persister, initial, event_persister, event_initial,
|
persister, initial, event_persister, event_initial,
|
||||||
started_at_ms,
|
started_at_ms, snapshot_store, snapshot_tx,
|
||||||
));
|
));
|
||||||
|
|
||||||
let mut next_client: ClientId = 0;
|
let mut next_client: ClientId = 0;
|
||||||
@@ -128,6 +145,8 @@ async fn router(
|
|||||||
event_persister: EventPersister,
|
event_persister: EventPersister,
|
||||||
event_initial: crate::event_log::EventLogState,
|
event_initial: crate::event_log::EventLogState,
|
||||||
started_at_ms: u64,
|
started_at_ms: u64,
|
||||||
|
snapshot_store: Arc<dyn SnapshotStore>,
|
||||||
|
snapshot_tx: mpsc::UnboundedSender<SnapshotMsg>,
|
||||||
) {
|
) {
|
||||||
let mut reg = Registry::new();
|
let mut reg = Registry::new();
|
||||||
reg.restore(initial);
|
reg.restore(initial);
|
||||||
@@ -175,10 +194,24 @@ async fn router(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ServerMsg::SnapshotTick => {
|
||||||
|
let ids = reg.live_ids();
|
||||||
|
for sid in ids {
|
||||||
|
let Some(handle) = reg.live(&sid) else { continue };
|
||||||
|
let (reply_tx, reply_rx) = oneshot::channel();
|
||||||
|
if handle.tx.send(SurfaceMsg::Snapshot { reply: reply_tx }).await.is_err() { continue; }
|
||||||
|
if let Ok((snap, dirty)) = reply_rx.await {
|
||||||
|
if dirty {
|
||||||
|
let _ = snapshot_tx.send(SnapshotMsg::Save(sid.clone(), snap));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
ServerMsg::Request { id, cmd, client, out } => {
|
ServerMsg::Request { id, cmd, client, out } => {
|
||||||
handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients,
|
handle_request(id, cmd, client, out, &mut reg, &mut subs, &clients,
|
||||||
&router_tx, &exit_tx, &state_tx, &persister,
|
&router_tx, &exit_tx, &state_tx, &persister,
|
||||||
&mut event_log, &event_persister, started_at_ms, &mut config).await;
|
&mut event_log, &event_persister, started_at_ms, &mut config,
|
||||||
|
&snapshot_store, &snapshot_tx).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -284,6 +317,8 @@ async fn handle_request(
|
|||||||
event_persister: &EventPersister,
|
event_persister: &EventPersister,
|
||||||
started_at_ms: u64,
|
started_at_ms: u64,
|
||||||
config: &mut crate::config::Config,
|
config: &mut crate::config::Config,
|
||||||
|
snapshot_store: &Arc<dyn SnapshotStore>,
|
||||||
|
snapshot_tx: &mpsc::UnboundedSender<SnapshotMsg>,
|
||||||
) {
|
) {
|
||||||
use spacesh_proto::message::SplitDir;
|
use spacesh_proto::message::SplitDir;
|
||||||
use spacesh_proto::layout::{LayoutNode, Orient};
|
use spacesh_proto::layout::{LayoutNode, Orient};
|
||||||
@@ -312,7 +347,7 @@ async fn handle_request(
|
|||||||
agent_label: command, cols, rows, autostart: false,
|
agent_label: command, cols, rows, autostart: false,
|
||||||
};
|
};
|
||||||
let (env, hooks_active) = spawn_env(&sid, &spec);
|
let (env, hooks_active) = spawn_env(&sid, &spec);
|
||||||
match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) {
|
match crate::surface::spawn_from_spec(sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) {
|
||||||
Ok(handle) => {
|
Ok(handle) => {
|
||||||
spawn_output_bridge(sid.clone(), &handle, router_tx.clone());
|
spawn_output_bridge(sid.clone(), &handle, router_tx.clone());
|
||||||
reg.set_live(handle);
|
reg.set_live(handle);
|
||||||
@@ -344,7 +379,7 @@ async fn handle_request(
|
|||||||
let shell = command.clone().unwrap_or_else(|| config.resolved_shell());
|
let shell = command.clone().unwrap_or_else(|| config.resolved_shell());
|
||||||
let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false };
|
let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false };
|
||||||
let (env, hooks_active) = spawn_env(&new_sid, &spec);
|
let (env, hooks_active) = spawn_env(&new_sid, &spec);
|
||||||
match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) {
|
match crate::surface::spawn_from_spec(new_sid.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) {
|
||||||
Ok(handle) => {
|
Ok(handle) => {
|
||||||
spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone());
|
spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone());
|
||||||
reg.set_live(handle);
|
reg.set_live(handle);
|
||||||
@@ -418,7 +453,7 @@ async fn handle_request(
|
|||||||
let args = slot.map(|s| s.args.clone()).unwrap_or_default();
|
let args = slot.map(|s| s.args.clone()).unwrap_or_default();
|
||||||
let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false };
|
let spec = SurfaceSpec { command: shell, args, cwd: ws.path.clone(), agent_label: command, cols: 80, rows: 24, autostart: false };
|
||||||
let (env, hooks_active) = spawn_env(&new_sid, &spec);
|
let (env, hooks_active) = spawn_env(&new_sid, &spec);
|
||||||
match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) {
|
match crate::surface::spawn_from_spec(new_sid.clone(), workspace_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) {
|
||||||
Ok(handle) => {
|
Ok(handle) => {
|
||||||
spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone());
|
spawn_output_bridge(new_sid.clone(), &handle, router_tx.clone());
|
||||||
reg.set_live(handle);
|
reg.set_live(handle);
|
||||||
@@ -449,7 +484,7 @@ async fn handle_request(
|
|||||||
};
|
};
|
||||||
let ws_id = reg.workspace_of(&surface_id).unwrap();
|
let ws_id = reg.workspace_of(&surface_id).unwrap();
|
||||||
let (env, hooks_active) = spawn_env(&surface_id, &spec);
|
let (env, hooks_active) = spawn_env(&surface_id, &spec);
|
||||||
match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone()) {
|
match crate::surface::spawn_from_spec(surface_id.clone(), ws_id.clone(), &spec, env, hooks_active, state_tx.clone(), exit_tx.clone(), snapshot_tx.clone()) {
|
||||||
Ok(handle) => {
|
Ok(handle) => {
|
||||||
spawn_output_bridge(surface_id.clone(), &handle, router_tx.clone());
|
spawn_output_bridge(surface_id.clone(), &handle, router_tx.clone());
|
||||||
reg.set_live(handle);
|
reg.set_live(handle);
|
||||||
@@ -464,6 +499,7 @@ async fn handle_request(
|
|||||||
Cmd::CloseWorkspace { workspace_id } => {
|
Cmd::CloseWorkspace { workspace_id } => {
|
||||||
let ids = reg.close_workspace(&workspace_id);
|
let ids = reg.close_workspace(&workspace_id);
|
||||||
for sid in &ids { crate::hooks::cleanup(sid); crate::hooks::cleanup_shell(sid); subs.remove(sid); }
|
for sid in &ids { crate::hooks::cleanup(sid); crate::hooks::cleanup_shell(sid); subs.remove(sid); }
|
||||||
|
for sid in &ids { let _ = snapshot_tx.send(SnapshotMsg::Remove(sid.clone())); }
|
||||||
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceClosed { workspace_id: workspace_id.clone() }));
|
broadcast_evt(clients, &Envelope::Evt(Evt::WorkspaceClosed { workspace_id: workspace_id.clone() }));
|
||||||
persister.mark_dirty(reg.persist_state());
|
persister.mark_dirty(reg.persist_state());
|
||||||
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
let _ = out.send(ok(id, serde_json::Value::Null)).await;
|
||||||
@@ -553,10 +589,20 @@ async fn handle_request(
|
|||||||
}
|
}
|
||||||
let _ = out.send(err(id, "INTERNAL", "attach failed")).await;
|
let _ = out.send(err(id, "INTERNAL", "attach failed")).await;
|
||||||
} else {
|
} else {
|
||||||
// stopped panel: no live stream, return an empty snapshot so the GUI shows the restart overlay.
|
// stopped panel: no live stream. Paint the last on-disk screen if we have one.
|
||||||
|
match snapshot_store.load(&surface_id) {
|
||||||
|
Some(snap) => {
|
||||||
|
let _ = out.send(ok(id, serde_json::json!({
|
||||||
|
"snapshot": snap.ansi, "cols": snap.cols, "rows": snap.rows,
|
||||||
|
"cursor_row": snap.cursor_row, "cursor_col": snap.cursor_col, "stopped": true,
|
||||||
|
}))).await;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0, "stopped": true }))).await;
|
let _ = out.send(ok(id, serde_json::json!({ "snapshot": "", "cols": 0, "rows": 0, "stopped": true }))).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Cmd::Detach { surface_id } => {
|
Cmd::Detach { surface_id } => {
|
||||||
if let Some(list) = subs.get_mut(&surface_id) { list.retain(|c| *c != client); }
|
if let Some(list) = subs.get_mut(&surface_id) { list.retain(|c| *c != client); }
|
||||||
@@ -580,6 +626,7 @@ async fn handle_request(
|
|||||||
subs.remove(&surface_id);
|
subs.remove(&surface_id);
|
||||||
crate::hooks::cleanup(&surface_id);
|
crate::hooks::cleanup(&surface_id);
|
||||||
crate::hooks::cleanup_shell(&surface_id);
|
crate::hooks::cleanup_shell(&surface_id);
|
||||||
|
let _ = snapshot_tx.send(SnapshotMsg::Remove(surface_id.clone()));
|
||||||
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() }));
|
broadcast_evt(clients, &Envelope::Evt(Evt::SurfaceClosed { surface_id: surface_id.clone() }));
|
||||||
if let Some(ws_id) = ws_id {
|
if let Some(ws_id) = ws_id {
|
||||||
emit_layout(reg, &ws_id, clients);
|
emit_layout(reg, &ws_id, clients);
|
||||||
@@ -739,6 +786,7 @@ fn spawn_output_bridge(
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
|
use crate::snapshot_store::NullSnapshotStore;
|
||||||
|
|
||||||
async fn req(stream: &mut UnixStream, id: u64, cmd: Cmd) -> Envelope {
|
async fn req(stream: &mut UnixStream, id: u64, cmd: Cmd) -> Envelope {
|
||||||
write_frame(stream, &Envelope::Req { id, cmd }).await.unwrap();
|
write_frame(stream, &Envelope::Req { id, cmd }).await.unwrap();
|
||||||
@@ -787,7 +835,7 @@ mod tests {
|
|||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
let store2 = store.clone();
|
let store2 = store.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
@@ -829,7 +877,7 @@ mod tests {
|
|||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
let store2 = store.clone();
|
let store2 = store.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
let r = req(&mut s, 1, Cmd::Input {
|
let r = req(&mut s, 1, Cmd::Input {
|
||||||
@@ -855,7 +903,7 @@ mod tests {
|
|||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
let store2 = store.clone();
|
let store2 = store.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
// First client: open, new surface that prints a marker, attach, then disconnect.
|
// First client: open, new surface that prints a marker, attach, then disconnect.
|
||||||
@@ -894,7 +942,7 @@ mod tests {
|
|||||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock2 = sock.clone();
|
let sock2 = sock.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
|
||||||
@@ -924,7 +972,7 @@ mod tests {
|
|||||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock2 = sock.clone();
|
let sock2 = sock.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
|
||||||
@@ -968,7 +1016,7 @@ mod tests {
|
|||||||
// per-test dir so instance B reads from disk what instance A persisted.
|
// per-test dir so instance B reads from disk what instance A persisted.
|
||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock2 = sock.clone();
|
let sock2 = sock.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
||||||
@@ -985,7 +1033,7 @@ mod tests {
|
|||||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
||||||
let event_store_b = make_event_store(&dir);
|
let event_store_b = make_event_store(&dir);
|
||||||
let sb2 = sock_b.clone();
|
let sb2 = sock_b.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b).await; });
|
tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sb2).await;
|
wait_for_socket(&sb2).await;
|
||||||
let mut s2 = UnixStream::connect(&sb2).await.unwrap();
|
let mut s2 = UnixStream::connect(&sb2).await.unwrap();
|
||||||
let r = req(&mut s2, 1, Cmd::Status).await;
|
let r = req(&mut s2, 1, Cmd::Status).await;
|
||||||
@@ -1007,7 +1055,7 @@ mod tests {
|
|||||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock2 = sock.clone();
|
let sock2 = sock.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
|
||||||
@@ -1045,7 +1093,7 @@ mod tests {
|
|||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
let store2 = store.clone();
|
let store2 = store.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
// Control connection: open workspace and spawn surface.
|
// Control connection: open workspace and spawn surface.
|
||||||
@@ -1098,7 +1146,7 @@ mod tests {
|
|||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
let store2 = store.clone();
|
let store2 = store.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
// Control connection: open workspace and spawn surface.
|
// Control connection: open workspace and spawn surface.
|
||||||
@@ -1150,7 +1198,7 @@ mod tests {
|
|||||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock2 = sock.clone();
|
let sock2 = sock.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
// Control connection: open workspace and spawn a surface that emits OSC 133.
|
// Control connection: open workspace and spawn a surface that emits OSC 133.
|
||||||
@@ -1201,7 +1249,7 @@ mod tests {
|
|||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
let store2 = store.clone();
|
let store2 = store.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
// Observer connection to catch the EventsRead broadcast.
|
// Observer connection to catch the EventsRead broadcast.
|
||||||
@@ -1269,7 +1317,7 @@ mod tests {
|
|||||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock2 = sock.clone();
|
let sock2 = sock.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock2, store, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock2, store, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
@@ -1311,7 +1359,7 @@ mod tests {
|
|||||||
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(state_path.clone()));
|
||||||
let event_store_b = make_event_store(&dir);
|
let event_store_b = make_event_store(&dir);
|
||||||
let sb2 = sock_b.clone();
|
let sb2 = sock_b.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b).await; });
|
tokio::spawn(async move { let _ = serve(&sock_b, store_b, event_store_b, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sb2).await;
|
wait_for_socket(&sb2).await;
|
||||||
|
|
||||||
let mut s2 = UnixStream::connect(&sb2).await.unwrap();
|
let mut s2 = UnixStream::connect(&sb2).await.unwrap();
|
||||||
@@ -1339,7 +1387,7 @@ mod tests {
|
|||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
let store2 = store.clone();
|
let store2 = store.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
// Observer connection.
|
// Observer connection.
|
||||||
@@ -1401,7 +1449,7 @@ mod tests {
|
|||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
let store2 = store.clone();
|
let store2 = store.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
|
||||||
@@ -1424,7 +1472,7 @@ mod tests {
|
|||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
let store2 = store.clone();
|
let store2 = store.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
let mut s = UnixStream::connect(&sock).await.unwrap();
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
|
||||||
@@ -1471,7 +1519,7 @@ mod tests {
|
|||||||
let event_store = make_event_store(&dir);
|
let event_store = make_event_store(&dir);
|
||||||
let sock_for_task = sock.clone();
|
let sock_for_task = sock.clone();
|
||||||
let store2 = store.clone();
|
let store2 = store.clone();
|
||||||
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store).await; });
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, std::sync::Arc::new(NullSnapshotStore)).await; });
|
||||||
wait_for_socket(&sock).await;
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
// Control connection: open, spawn, zoom.
|
// Control connection: open, spawn, zoom.
|
||||||
@@ -1511,4 +1559,52 @@ mod tests {
|
|||||||
}
|
}
|
||||||
assert!(saw_cleared, "expected a WorkspaceChanged broadcast with cleared zoom after closing the zoomed surface");
|
assert!(saw_cleared, "expected a WorkspaceChanged broadcast with cleared zoom after closing the zoomed surface");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn stopped_attach_returns_disk_snapshot() {
|
||||||
|
let _serial = crate::test_support::serial();
|
||||||
|
let dir = tempdir_path();
|
||||||
|
let sock = dir.join("sock");
|
||||||
|
let store: std::sync::Arc<dyn crate::state_store::StateStore> =
|
||||||
|
std::sync::Arc::new(crate::state_store::JsonStateStore::new(dir.join("state.json")));
|
||||||
|
let event_store = make_event_store(&dir);
|
||||||
|
// Use a real JsonSnapshotStore so the on-exit dump actually lands on disk.
|
||||||
|
let snap_dir = dir.join("snapshots");
|
||||||
|
let snapshot_store: std::sync::Arc<dyn crate::snapshot_store::SnapshotStore> =
|
||||||
|
std::sync::Arc::new(crate::snapshot_store::JsonSnapshotStore::new(snap_dir.clone()));
|
||||||
|
let sock_for_task = sock.clone();
|
||||||
|
let store2 = store.clone();
|
||||||
|
let snap_store2 = snapshot_store.clone();
|
||||||
|
tokio::spawn(async move { let _ = serve(&sock_for_task, store2, event_store, snap_store2).await; });
|
||||||
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
|
// Open workspace, spawn a surface that prints a unique marker then exits quickly.
|
||||||
|
let surface_id;
|
||||||
|
{
|
||||||
|
let mut s = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
let r = req(&mut s, 1, Cmd::Open { path: std::env::temp_dir().to_string_lossy().into() }).await;
|
||||||
|
let ws = res_data(&r)["workspace_id"].as_str().unwrap().to_string();
|
||||||
|
let r = req(&mut s, 2, Cmd::NewSurface {
|
||||||
|
workspace_id: spacesh_proto::WorkspaceId(ws),
|
||||||
|
command: Some("/bin/sh".into()),
|
||||||
|
args: vec!["-c".into(), "printf SNAPDISK; sleep 0.2".into()],
|
||||||
|
cols: 80, rows: 24,
|
||||||
|
}).await;
|
||||||
|
surface_id = spacesh_proto::SurfaceId(res_data(&r)["surface_id"].as_str().unwrap().to_string());
|
||||||
|
// Give the process time to run, exit, and the actor to dump its snapshot to the writer.
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(1500)).await;
|
||||||
|
// s drops here
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-verify the socket is still alive.
|
||||||
|
wait_for_socket(&sock).await;
|
||||||
|
|
||||||
|
// Fresh client: attach to the now-stopped surface.
|
||||||
|
let mut s2 = UnixStream::connect(&sock).await.unwrap();
|
||||||
|
let r = req(&mut s2, 1, Cmd::Attach { surface_id: surface_id.clone() }).await;
|
||||||
|
let data = res_data(&r);
|
||||||
|
assert_eq!(data["stopped"].as_bool(), Some(true), "surface should be stopped");
|
||||||
|
let snap = data["snapshot"].as_str().unwrap_or("");
|
||||||
|
assert!(snap.contains("SNAPDISK"), "on-disk snapshot should contain SNAPDISK, got: {snap:?}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user