fix(app): abort the old reader on reconnect (fixes doubled keystroke echo)

reconnect() spawned a new reader/writer but left the previous reader task
running. A reconnect triggered while the old connection was still alive (e.g.
a request timing out during a slow daemon start) left TWO live connections;
the daemon broadcast Output to both, so every byte — including input echo —
arrived twice ("ccucurcurl"). The bridge now stores the reader's JoinHandle
and aborts it before establishing the new connection, guaranteeing a single
live reader.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-15 11:56:35 +07:00
parent ee969371c9
commit c84b96abc0
+15 -7
View File
@@ -29,6 +29,9 @@ pub struct Bridge {
tx: Mutex<mpsc::Sender<Envelope>>, tx: Mutex<mpsc::Sender<Envelope>>,
/// Serializes reconnect attempts. /// Serializes reconnect attempts.
reconnect_lock: Mutex<()>, reconnect_lock: Mutex<()>,
/// The current reader task; aborted on reconnect so a stale connection can't
/// keep delivering duplicate output (which doubled keystroke echo).
reader: Mutex<tokio::task::JoinHandle<()>>,
/// Pending request id → reply slot. /// Pending request id → reply slot.
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>, pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
/// surface id → output channel into the webview. /// surface id → output channel into the webview.
@@ -102,13 +105,13 @@ async fn spawn_connection(
app: &AppHandle, app: &AppHandle,
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>, pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>>, out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>>,
) -> Result<mpsc::Sender<Envelope>> { ) -> Result<(mpsc::Sender<Envelope>, tokio::task::JoinHandle<()>)> {
let stream = ensure_daemon(sock).await?; let stream = ensure_daemon(sock).await?;
let (read_half, write_half) = stream.into_split(); let (read_half, write_half) = stream.into_split();
let (tx, rx) = mpsc::channel::<Envelope>(256); let (tx, rx) = mpsc::channel::<Envelope>(256);
spawn_writer(write_half, rx); spawn_writer(write_half, rx);
spawn_reader(read_half, app.clone(), pending, out_channels); let reader = spawn_reader(read_half, app.clone(), pending, out_channels);
Ok(tx) Ok((tx, reader))
} }
impl Bridge { impl Bridge {
@@ -116,7 +119,7 @@ impl Bridge {
let sock = socket_path()?; let sock = socket_path()?;
let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>> = Arc::default(); let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>> = Arc::default();
let out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>> = Arc::default(); let out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>> = Arc::default();
let tx = spawn_connection(&sock, &app, pending.clone(), out_channels.clone()).await?; let (tx, reader) = spawn_connection(&sock, &app, pending.clone(), out_channels.clone()).await?;
Ok(Self { Ok(Self {
next_id: AtomicU64::new(1), next_id: AtomicU64::new(1),
app, app,
@@ -124,6 +127,7 @@ impl Bridge {
gen: AtomicU64::new(0), gen: AtomicU64::new(0),
tx: Mutex::new(tx), tx: Mutex::new(tx),
reconnect_lock: Mutex::new(()), reconnect_lock: Mutex::new(()),
reader: Mutex::new(reader),
pending, pending,
out_channels, out_channels,
}) })
@@ -139,8 +143,12 @@ impl Bridge {
} }
// Drop in-flight reply slots — their connection is gone; they'll error out. // Drop in-flight reply slots — their connection is gone; they'll error out.
self.pending.lock().await.clear(); self.pending.lock().await.clear();
let new_tx = spawn_connection(&self.sock, &self.app, self.pending.clone(), self.out_channels.clone()).await?; // Kill the old reader FIRST so it can't keep delivering output on a stale
// connection alongside the new one (the cause of doubled keystroke echo).
self.reader.lock().await.abort();
let (new_tx, new_reader) = spawn_connection(&self.sock, &self.app, self.pending.clone(), self.out_channels.clone()).await?;
*self.tx.lock().await = new_tx; *self.tx.lock().await = new_tx;
*self.reader.lock().await = new_reader;
self.gen.fetch_add(1, Ordering::Release); self.gen.fetch_add(1, Ordering::Release);
let _ = self.app.emit("spacesh:reconnected", ()); let _ = self.app.emit("spacesh:reconnected", ());
Ok(()) Ok(())
@@ -205,7 +213,7 @@ fn spawn_reader(
app: AppHandle, app: AppHandle,
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>, pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>>, out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>>,
) { ) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
match read_frame(&mut read_half).await { match read_frame(&mut read_half).await {
@@ -232,7 +240,7 @@ fn spawn_reader(
} }
} }
} }
}); })
} }
// ---- Tauri commands ---- // ---- Tauri commands ----