Merge bridge-reconnect: GUI self-heals after daemon restart
This commit is contained in:
+85
-13
@@ -2,6 +2,7 @@ use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use base64::Engine;
|
||||
@@ -18,8 +19,16 @@ use tokio::sync::{mpsc, oneshot, Mutex};
|
||||
|
||||
pub struct Bridge {
|
||||
next_id: AtomicU64,
|
||||
/// Outbound frames to the daemon.
|
||||
tx: mpsc::Sender<Envelope>,
|
||||
/// For respawning/reconnecting the daemon connection after it drops.
|
||||
app: AppHandle,
|
||||
sock: PathBuf,
|
||||
/// Bumped on every successful reconnect; lets concurrent failing requests
|
||||
/// collapse into a single reconnect (single-flight).
|
||||
gen: AtomicU64,
|
||||
/// Outbound frames to the daemon. Swapped on reconnect.
|
||||
tx: Mutex<mpsc::Sender<Envelope>>,
|
||||
/// Serializes reconnect attempts.
|
||||
reconnect_lock: Mutex<()>,
|
||||
/// Pending request id → reply slot.
|
||||
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
|
||||
/// surface id → output channel into the webview.
|
||||
@@ -85,28 +94,91 @@ async fn ensure_daemon(sock: &PathBuf) -> Result<UnixStream> {
|
||||
anyhow::bail!("daemon spawned ({}) but did not bind {} in time", daemon.display(), sock.display())
|
||||
}
|
||||
|
||||
/// Connect (spawning the daemon if needed) and start the reader/writer tasks,
|
||||
/// returning the outbound sender. Shared `pending`/`out_channels` are reused so
|
||||
/// replies and live output keep routing across reconnects.
|
||||
async fn spawn_connection(
|
||||
sock: &PathBuf,
|
||||
app: &AppHandle,
|
||||
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
|
||||
out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>>,
|
||||
) -> Result<mpsc::Sender<Envelope>> {
|
||||
let stream = ensure_daemon(sock).await?;
|
||||
let (read_half, write_half) = stream.into_split();
|
||||
let (tx, rx) = mpsc::channel::<Envelope>(256);
|
||||
spawn_writer(write_half, rx);
|
||||
spawn_reader(read_half, app.clone(), pending, out_channels);
|
||||
Ok(tx)
|
||||
}
|
||||
|
||||
impl Bridge {
|
||||
pub async fn connect(app: AppHandle) -> Result<Self> {
|
||||
let sock = socket_path()?;
|
||||
let stream = ensure_daemon(&sock).await?;
|
||||
let (read_half, write_half) = stream.into_split();
|
||||
|
||||
let (tx, rx) = mpsc::channel::<Envelope>(256);
|
||||
let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>> = Arc::default();
|
||||
let out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>> = Arc::default();
|
||||
let tx = spawn_connection(&sock, &app, pending.clone(), out_channels.clone()).await?;
|
||||
Ok(Self {
|
||||
next_id: AtomicU64::new(1),
|
||||
app,
|
||||
sock,
|
||||
gen: AtomicU64::new(0),
|
||||
tx: Mutex::new(tx),
|
||||
reconnect_lock: Mutex::new(()),
|
||||
pending,
|
||||
out_channels,
|
||||
})
|
||||
}
|
||||
|
||||
spawn_writer(write_half, rx);
|
||||
spawn_reader(read_half, app, pending.clone(), out_channels.clone());
|
||||
/// Re-establish the daemon connection. Single-flight: callers pass the `gen`
|
||||
/// they observed; if another caller already reconnected (gen advanced), this
|
||||
/// is a no-op so we never open duplicate connections.
|
||||
async fn reconnect(&self, seen_gen: u64) -> Result<()> {
|
||||
let _guard = self.reconnect_lock.lock().await;
|
||||
if self.gen.load(Ordering::Acquire) != seen_gen {
|
||||
return Ok(());
|
||||
}
|
||||
// Drop in-flight reply slots — their connection is gone; they'll error out.
|
||||
self.pending.lock().await.clear();
|
||||
let new_tx = spawn_connection(&self.sock, &self.app, self.pending.clone(), self.out_channels.clone()).await?;
|
||||
*self.tx.lock().await = new_tx;
|
||||
self.gen.fetch_add(1, Ordering::Release);
|
||||
let _ = self.app.emit("spacesh:reconnected", ());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Ok(Self { next_id: AtomicU64::new(1), tx, pending, out_channels })
|
||||
/// Send one request and await its reply with a timeout. Errors if the writer
|
||||
/// is gone, the reply slot is dropped, or no reply arrives in time.
|
||||
async fn send_once(&self, id: u64, env: Envelope) -> Result<Envelope> {
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
self.pending.lock().await.insert(id, reply_tx);
|
||||
let tx = self.tx.lock().await.clone();
|
||||
if tx.send(env).await.is_err() {
|
||||
self.pending.lock().await.remove(&id);
|
||||
anyhow::bail!("daemon writer closed");
|
||||
}
|
||||
match tokio::time::timeout(Duration::from_secs(5), reply_rx).await {
|
||||
Ok(Ok(env)) => Ok(env),
|
||||
Ok(Err(_)) => anyhow::bail!("reply slot dropped"),
|
||||
Err(_) => {
|
||||
self.pending.lock().await.remove(&id);
|
||||
anyhow::bail!("request timed out")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn request(&self, cmd: Cmd) -> Result<Envelope> {
|
||||
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
self.pending.lock().await.insert(id, reply_tx);
|
||||
self.tx.send(Envelope::Req { id, cmd }).await?;
|
||||
Ok(reply_rx.await?)
|
||||
let seen_gen = self.gen.load(Ordering::Acquire);
|
||||
let env = Envelope::Req { id, cmd };
|
||||
match self.send_once(id, env.clone()).await {
|
||||
Ok(reply) => Ok(reply),
|
||||
Err(_) => {
|
||||
// Connection likely dropped — reconnect (respawns the daemon if
|
||||
// it exited) and retry once.
|
||||
self.reconnect(seen_gen).await?;
|
||||
self.send_once(id, env).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn register_output(&self, surface_id: String, channel: Channel<Vec<u8>>) {
|
||||
|
||||
+13
-2
@@ -37,6 +37,9 @@ export function App() {
|
||||
const [sidebarOpen, setSidebarOpen] = useState(() => loadFlag("spacesh.sidebarOpen", true));
|
||||
const [health, setHealth] = useState<DaemonHealth | null>(null);
|
||||
const [config, setConfigState] = useState<ConfigView | null>(null);
|
||||
// Bumped when the daemon connection is re-established; used to remount the
|
||||
// layout so terminals re-attach (snapshot + live stream) to the restarted daemon.
|
||||
const [connEpoch, setConnEpoch] = useState(0);
|
||||
const [connected, setConnected] = useState(false);
|
||||
const [focusedId, setFocusedId] = useState<string | null>(null);
|
||||
const [searchSurfaceId, setSearchSurfaceId] = useState<string | null>(null);
|
||||
@@ -112,7 +115,15 @@ export function App() {
|
||||
void loadHealth();
|
||||
void getConfig().then((c) => { setConfigState(c); applyTheme(c.theme, c.accent); }).catch(() => {});
|
||||
});
|
||||
return () => { void unlisten.then((f) => f()); void reconnect.then((f) => f()); };
|
||||
const reconnected = onDaemonRawEvent("spacesh:reconnected", () => {
|
||||
setConnected(true);
|
||||
setConnEpoch((n) => n + 1); // remount layout → terminals re-attach to the new daemon
|
||||
void refresh();
|
||||
void seedEvents();
|
||||
void loadHealth();
|
||||
void getConfig().then((c) => { setConfigState(c); applyTheme(c.theme, c.accent); }).catch(() => {});
|
||||
});
|
||||
return () => { void unlisten.then((f) => f()); void reconnect.then((f) => f()); void reconnected.then((f) => f()); };
|
||||
}, [refresh, seedEvents, loadHealth]);
|
||||
|
||||
useEffect(() => {
|
||||
@@ -158,7 +169,7 @@ export function App() {
|
||||
)}
|
||||
<div style={{ flex: 1, minHeight: 0, position: "relative" }}>
|
||||
{active
|
||||
? <LayoutEngine workspaceId={active.id} layout={active.layout} running={running} states={states} surfaces={active.surfaces} focusedId={effectiveFocus} onFocus={setFocusedId} zoomed={active.zoomed} searchSurfaceId={searchSurfaceId} searchNonce={searchNonce} onCloseSearch={() => setSearchSurfaceId(null)} font={termFont} palette={termPalette} />
|
||||
? <LayoutEngine key={connEpoch} workspaceId={active.id} layout={active.layout} running={running} states={states} surfaces={active.surfaces} focusedId={effectiveFocus} onFocus={setFocusedId} zoomed={active.zoomed} searchSurfaceId={searchSurfaceId} searchNonce={searchNonce} onCloseSearch={() => setSearchSurfaceId(null)} font={termFont} palette={termPalette} />
|
||||
: <div style={{ color: COLORS.textMuted, padding: 24 }}>No workspace — create one to begin.</div>}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { useEffect, useRef, useState } from "react";
|
||||
import { COLORS, FONT, ACCENTS } from "./theme";
|
||||
import { setConfig, shutdownDaemon, restartDaemon } from "./socketBridge";
|
||||
import { setConfig, restartDaemon } from "./socketBridge";
|
||||
import type { ConfigView, DaemonHealth } from "./socketBridge";
|
||||
|
||||
const FONTS = ["JetBrains Mono", "Menlo", "Monaco", "SF Mono", "Fira Code", "Cascadia Code"];
|
||||
@@ -71,7 +71,7 @@ function fmtUptime(ms: number): string {
|
||||
}
|
||||
|
||||
function DaemonSection({ health, onReload }: { health: DaemonHealth | null; onReload: () => void }) {
|
||||
const [confirm, setConfirm] = useState<null | "stop" | "restart">(null);
|
||||
const [confirm, setConfirm] = useState(false);
|
||||
// Tick so uptime counts up live while the modal is open.
|
||||
const [, setTick] = useState(0);
|
||||
useEffect(() => {
|
||||
@@ -88,19 +88,18 @@ function DaemonSection({ health, onReload }: { health: DaemonHealth | null; onRe
|
||||
</>) : <div>offline</div>}
|
||||
</div>
|
||||
<div style={{ display: "flex", gap: 8, marginTop: 12 }}>
|
||||
<button onClick={() => setConfirm("restart")} style={{ padding: "7px 14px", background: COLORS.bgElevated, color: COLORS.textPrimary, border: `1px solid ${COLORS.borderStrong}`, borderRadius: 7, fontSize: 13 }}>Restart</button>
|
||||
<button onClick={() => setConfirm("stop")} style={{ padding: "7px 14px", background: "transparent", color: COLORS.stError, border: `1px solid ${COLORS.stError}`, borderRadius: 7, fontSize: 13 }}>Stop</button>
|
||||
<button onClick={() => setConfirm(true)} style={{ padding: "7px 14px", background: COLORS.bgElevated, color: COLORS.textPrimary, border: `1px solid ${COLORS.borderStrong}`, borderRadius: 7, fontSize: 13 }}>Restart</button>
|
||||
</div>
|
||||
{confirm && (
|
||||
<div style={{ marginTop: 10, padding: 10, borderRadius: 8, background: COLORS.bgPanel, border: `1px solid ${COLORS.borderStrong}` }}>
|
||||
<div style={{ fontSize: 12, color: COLORS.textSecondary, marginBottom: 8 }}>
|
||||
{confirm === "stop" ? "Stop the daemon? All sessions end." : "Restart the daemon? Sessions end and respawn."}
|
||||
Restart the daemon? Running sessions end and respawn; panels re-attach automatically.
|
||||
</div>
|
||||
<div style={{ display: "flex", gap: 8, justifyContent: "flex-end" }}>
|
||||
<button onClick={() => setConfirm(null)} style={{ padding: "5px 12px", background: COLORS.bgElevated, color: COLORS.textPrimary, border: `1px solid ${COLORS.borderStrong}`, borderRadius: 6, fontSize: 12 }}>Cancel</button>
|
||||
<button onClick={() => { const c = confirm; setConfirm(null); void (c === "stop" ? shutdownDaemon() : restartDaemon()).then(onReload); }}
|
||||
<button onClick={() => setConfirm(false)} style={{ padding: "5px 12px", background: COLORS.bgElevated, color: COLORS.textPrimary, border: `1px solid ${COLORS.borderStrong}`, borderRadius: 6, fontSize: 12 }}>Cancel</button>
|
||||
<button onClick={() => { setConfirm(false); void restartDaemon().then(onReload); }}
|
||||
style={{ padding: "5px 12px", background: COLORS.stError, color: "#fff", border: "none", borderRadius: 6, fontSize: 12, fontWeight: 600 }}>
|
||||
{confirm === "stop" ? "Stop" : "Restart"}
|
||||
Restart
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
Reference in New Issue
Block a user