Merge fix-double-echo: single reader across reconnects
This commit is contained in:
@@ -29,6 +29,9 @@ pub struct Bridge {
|
|||||||
tx: Mutex<mpsc::Sender<Envelope>>,
|
tx: Mutex<mpsc::Sender<Envelope>>,
|
||||||
/// Serializes reconnect attempts.
|
/// Serializes reconnect attempts.
|
||||||
reconnect_lock: Mutex<()>,
|
reconnect_lock: Mutex<()>,
|
||||||
|
/// The current reader task; aborted on reconnect so a stale connection can't
|
||||||
|
/// keep delivering duplicate output (which doubled keystroke echo).
|
||||||
|
reader: Mutex<tokio::task::JoinHandle<()>>,
|
||||||
/// Pending request id → reply slot.
|
/// Pending request id → reply slot.
|
||||||
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
|
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
|
||||||
/// surface id → output channel into the webview.
|
/// surface id → output channel into the webview.
|
||||||
@@ -102,13 +105,13 @@ async fn spawn_connection(
|
|||||||
app: &AppHandle,
|
app: &AppHandle,
|
||||||
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
|
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
|
||||||
out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>>,
|
out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>>,
|
||||||
) -> Result<mpsc::Sender<Envelope>> {
|
) -> Result<(mpsc::Sender<Envelope>, tokio::task::JoinHandle<()>)> {
|
||||||
let stream = ensure_daemon(sock).await?;
|
let stream = ensure_daemon(sock).await?;
|
||||||
let (read_half, write_half) = stream.into_split();
|
let (read_half, write_half) = stream.into_split();
|
||||||
let (tx, rx) = mpsc::channel::<Envelope>(256);
|
let (tx, rx) = mpsc::channel::<Envelope>(256);
|
||||||
spawn_writer(write_half, rx);
|
spawn_writer(write_half, rx);
|
||||||
spawn_reader(read_half, app.clone(), pending, out_channels);
|
let reader = spawn_reader(read_half, app.clone(), pending, out_channels);
|
||||||
Ok(tx)
|
Ok((tx, reader))
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Bridge {
|
impl Bridge {
|
||||||
@@ -116,7 +119,7 @@ impl Bridge {
|
|||||||
let sock = socket_path()?;
|
let sock = socket_path()?;
|
||||||
let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>> = Arc::default();
|
let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>> = Arc::default();
|
||||||
let out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>> = Arc::default();
|
let out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>> = Arc::default();
|
||||||
let tx = spawn_connection(&sock, &app, pending.clone(), out_channels.clone()).await?;
|
let (tx, reader) = spawn_connection(&sock, &app, pending.clone(), out_channels.clone()).await?;
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
next_id: AtomicU64::new(1),
|
next_id: AtomicU64::new(1),
|
||||||
app,
|
app,
|
||||||
@@ -124,6 +127,7 @@ impl Bridge {
|
|||||||
gen: AtomicU64::new(0),
|
gen: AtomicU64::new(0),
|
||||||
tx: Mutex::new(tx),
|
tx: Mutex::new(tx),
|
||||||
reconnect_lock: Mutex::new(()),
|
reconnect_lock: Mutex::new(()),
|
||||||
|
reader: Mutex::new(reader),
|
||||||
pending,
|
pending,
|
||||||
out_channels,
|
out_channels,
|
||||||
})
|
})
|
||||||
@@ -139,8 +143,12 @@ impl Bridge {
|
|||||||
}
|
}
|
||||||
// Drop in-flight reply slots — their connection is gone; they'll error out.
|
// Drop in-flight reply slots — their connection is gone; they'll error out.
|
||||||
self.pending.lock().await.clear();
|
self.pending.lock().await.clear();
|
||||||
let new_tx = spawn_connection(&self.sock, &self.app, self.pending.clone(), self.out_channels.clone()).await?;
|
// Kill the old reader FIRST so it can't keep delivering output on a stale
|
||||||
|
// connection alongside the new one (the cause of doubled keystroke echo).
|
||||||
|
self.reader.lock().await.abort();
|
||||||
|
let (new_tx, new_reader) = spawn_connection(&self.sock, &self.app, self.pending.clone(), self.out_channels.clone()).await?;
|
||||||
*self.tx.lock().await = new_tx;
|
*self.tx.lock().await = new_tx;
|
||||||
|
*self.reader.lock().await = new_reader;
|
||||||
self.gen.fetch_add(1, Ordering::Release);
|
self.gen.fetch_add(1, Ordering::Release);
|
||||||
let _ = self.app.emit("spacesh:reconnected", ());
|
let _ = self.app.emit("spacesh:reconnected", ());
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -205,7 +213,7 @@ fn spawn_reader(
|
|||||||
app: AppHandle,
|
app: AppHandle,
|
||||||
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
|
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Envelope>>>>,
|
||||||
out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>>,
|
out_channels: Arc<Mutex<HashMap<String, Channel<Vec<u8>>>>>,
|
||||||
) {
|
) -> tokio::task::JoinHandle<()> {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
match read_frame(&mut read_half).await {
|
match read_frame(&mut read_half).await {
|
||||||
@@ -232,7 +240,7 @@ fn spawn_reader(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Tauri commands ----
|
// ---- Tauri commands ----
|
||||||
|
|||||||
Reference in New Issue
Block a user