fix(app): bridge auto-reconnect so daemon restart no longer bricks the GUI
The Tauri bridge connected to the daemon once at startup and held a single stream with no recovery: when the daemon exited (Restart/Stop, crash, or an update), the reader emitted spacesh:disconnected and died, and every later request went through the dead writer forever — the GUI was permanently stuck (settings frozen, offline). Since the bridge is Rust-side state that survives a webview reload, even Cmd+R didn't recover it. - bridge.rs: requests now reconnect-and-retry on failure with a single-flight guard (generation counter) so concurrent failures collapse into one reconnect and never open duplicate connections; a 5s reply timeout catches silently-dropped connections. ensure_daemon respawns the daemon if it exited. On success the bridge emits spacesh:reconnected. - App.tsx: on spacesh:reconnected, bump a connection epoch that keys LayoutEngine, remounting terminals so they re-attach (snapshot + live stream) to the restarted daemon; also reload health/config/status. - Settings: drop the Stop button — with lazy daemon spawn any GUI request resurrects the daemon, so an in-GUI "stop" is contradictory. Restart now works end to end (shutdown → reconnect respawns → panels re-attach). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+85
-13
@@ -2,6 +2,7 @@ use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use base64::Engine;
|
||||
@@ -18,8 +19,16 @@ use tokio::sync::{mpsc, oneshot, Mutex};
|
||||
|
||||
pub struct Bridge {
|
||||
next_id: AtomicU64,
|
||||
/// Outbound frames to the daemon.
|
||||
tx: mpsc::Sender<Envelope>,
|
||||
/// For respawning/reconnecting the daemon connection after it drops.
|
||||
app: AppHandle,
|
||||
sock: PathBuf,
|
||||
/// Bumped on every successful reconnect; lets concurrent failing requests
|
||||
/// collapse into a single reconnect (single-flight).
|
||||
gen: AtomicU64,
|
||||
/// Outbound frames to the daemon. Swapped on reconnect.
|
||||
tx: Mutex<mpsc::Sender<Envelope>>,
|
||||
/// Serializes reconnect attempts.
|
||||
reconnect_lock: Mutex<()>,
|
||||
/// Pending request id → reply slot.
|
||||
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
|
||||
/// surface id → output channel into the webview.
|
||||
@@ -85,28 +94,91 @@ async fn ensure_daemon(sock: &PathBuf) -> Result<UnixStream> {
|
||||
anyhow::bail!("daemon spawned ({}) but did not bind {} in time", daemon.display(), sock.display())
|
||||
}
|
||||
|
||||
/// Connect (spawning the daemon if needed) and start the reader/writer tasks,
|
||||
/// returning the outbound sender. Shared `pending`/`out_channels` are reused so
|
||||
/// replies and live output keep routing across reconnects.
|
||||
async fn spawn_connection(
|
||||
sock: &PathBuf,
|
||||
app: &AppHandle,
|
||||
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
|
||||
out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>>,
|
||||
) -> Result<mpsc::Sender<Envelope>> {
|
||||
let stream = ensure_daemon(sock).await?;
|
||||
let (read_half, write_half) = stream.into_split();
|
||||
let (tx, rx) = mpsc::channel::<Envelope>(256);
|
||||
spawn_writer(write_half, rx);
|
||||
spawn_reader(read_half, app.clone(), pending, out_channels);
|
||||
Ok(tx)
|
||||
}
|
||||
|
||||
impl Bridge {
|
||||
pub async fn connect(app: AppHandle) -> Result<Self> {
|
||||
let sock = socket_path()?;
|
||||
let stream = ensure_daemon(&sock).await?;
|
||||
let (read_half, write_half) = stream.into_split();
|
||||
|
||||
let (tx, rx) = mpsc::channel::<Envelope>(256);
|
||||
let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>> = Arc::default();
|
||||
let out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>> = Arc::default();
|
||||
let tx = spawn_connection(&sock, &app, pending.clone(), out_channels.clone()).await?;
|
||||
Ok(Self {
|
||||
next_id: AtomicU64::new(1),
|
||||
app,
|
||||
sock,
|
||||
gen: AtomicU64::new(0),
|
||||
tx: Mutex::new(tx),
|
||||
reconnect_lock: Mutex::new(()),
|
||||
pending,
|
||||
out_channels,
|
||||
})
|
||||
}
|
||||
|
||||
spawn_writer(write_half, rx);
|
||||
spawn_reader(read_half, app, pending.clone(), out_channels.clone());
|
||||
/// Re-establish the daemon connection. Single-flight: callers pass the `gen`
|
||||
/// they observed; if another caller already reconnected (gen advanced), this
|
||||
/// is a no-op so we never open duplicate connections.
|
||||
async fn reconnect(&self, seen_gen: u64) -> Result<()> {
|
||||
let _guard = self.reconnect_lock.lock().await;
|
||||
if self.gen.load(Ordering::Acquire) != seen_gen {
|
||||
return Ok(());
|
||||
}
|
||||
// Drop in-flight reply slots — their connection is gone; they'll error out.
|
||||
self.pending.lock().await.clear();
|
||||
let new_tx = spawn_connection(&self.sock, &self.app, self.pending.clone(), self.out_channels.clone()).await?;
|
||||
*self.tx.lock().await = new_tx;
|
||||
self.gen.fetch_add(1, Ordering::Release);
|
||||
let _ = self.app.emit("spacesh:reconnected", ());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Ok(Self { next_id: AtomicU64::new(1), tx, pending, out_channels })
|
||||
/// Send one request and await its reply with a timeout. Errors if the writer
|
||||
/// is gone, the reply slot is dropped, or no reply arrives in time.
|
||||
async fn send_once(&self, id: u64, env: Envelope) -> Result<Envelope> {
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
self.pending.lock().await.insert(id, reply_tx);
|
||||
let tx = self.tx.lock().await.clone();
|
||||
if tx.send(env).await.is_err() {
|
||||
self.pending.lock().await.remove(&id);
|
||||
anyhow::bail!("daemon writer closed");
|
||||
}
|
||||
match tokio::time::timeout(Duration::from_secs(5), reply_rx).await {
|
||||
Ok(Ok(env)) => Ok(env),
|
||||
Ok(Err(_)) => anyhow::bail!("reply slot dropped"),
|
||||
Err(_) => {
|
||||
self.pending.lock().await.remove(&id);
|
||||
anyhow::bail!("request timed out")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn request(&self, cmd: Cmd) -> Result<Envelope> {
|
||||
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
self.pending.lock().await.insert(id, reply_tx);
|
||||
self.tx.send(Envelope::Req { id, cmd }).await?;
|
||||
Ok(reply_rx.await?)
|
||||
let seen_gen = self.gen.load(Ordering::Acquire);
|
||||
let env = Envelope::Req { id, cmd };
|
||||
match self.send_once(id, env.clone()).await {
|
||||
Ok(reply) => Ok(reply),
|
||||
Err(_) => {
|
||||
// Connection likely dropped — reconnect (respawns the daemon if
|
||||
// it exited) and retry once.
|
||||
self.reconnect(seen_gen).await?;
|
||||
self.send_once(id, env).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn register_output(&self, surface_id: String, channel: Channel<Vec<u8>>) {
|
||||
|
||||
Reference in New Issue
Block a user