Skip to content

Commit 77062af

Browse files
committed
feat: More complete disconnect and notif support.
* Properly propagate disconnect from Rust to TS. * Add more code to exercise notification readable.
1 parent 49bfffa commit 77062af

File tree

6 files changed

+123
-60
lines changed

6 files changed

+123
-60
lines changed

src-tauri/src/transport/commands.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub struct AvailableDevice {
1818
pub id: String
1919
}
2020

21-
#[derive(Default)]
21+
#[derive(Debug, Default)]
2222
pub struct ActiveConnection<'a> { pub conn: Mutex<Option<Box<dyn Sink<Vec<u8>, Error = SendError> + Unpin + Send + 'a>>> }
2323

2424
#[command]

src-tauri/src/transport/gatt.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,16 @@ pub async fn gatt_connect(
5151
// Need to keep adapter from being dropped while active/connected
5252
let a = adapter;
5353

54-
while let Some(Ok(vn)) = n.next().await {
55-
use tauri::Manager;
54+
use tauri::Manager;
5655

56+
while let Some(Ok(vn)) = n.next().await {
5757
app_handle.emit("connection_data", vn.clone());
5858
}
59+
60+
let state = app_handle.state::<super::commands::ActiveConnection>();
61+
*state.conn.lock().await = None;
62+
63+
app_handle.emit("connection_disconnected", ());
5964
}
6065
});
6166

@@ -89,10 +94,10 @@ pub async fn gatt_list_devices() -> Result<Vec<super::commands::AvailableDevice>
8994
.take_until(async_std::task::sleep(Duration::from_secs(2)))
9095
.filter_map(|d| ready(d.ok()))
9196
.then(move |device| async move {
92-
let label = device.name_async().await.unwrap_or("Unknown".to_string());
93-
let id = serde_json::to_string(&device.id()).unwrap();
97+
let label = device.name_async().await.unwrap_or("Unknown".to_string());
98+
let id = serde_json::to_string(&device.id()).unwrap();
9499

95-
super::commands::AvailableDevice { label, id }
100+
super::commands::AvailableDevice { label, id }
96101
})
97102
.collect::<Vec<_>>()
98103
.await;

src-tauri/src/transport/serial.rs

Lines changed: 67 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,87 @@
1-
21
use blocking::unblock;
3-
use futures::stream;
42
use futures::channel::mpsc::channel;
5-
use futures::StreamExt;
63
use futures::lock::Mutex;
4+
use futures::stream;
5+
use futures::StreamExt;
76

8-
use std::{
9-
time::Duration,
10-
};
7+
use std::time::Duration;
118

12-
use tokio::io::{ AsyncWriteExt, AsyncReadExt };
13-
use tokio_serial::{available_ports, SerialPort, SerialPortType, SerialPortBuilderExt};
9+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
10+
use tokio_serial::{available_ports, SerialPort, SerialPortBuilderExt, SerialPortType};
1411

1512
use serde::{Deserialize, Serialize};
1613
use tauri::{
17-
AppHandle,
18-
command,
19-
ipc::{Request, Response},
20-
State, Window,
14+
command,
15+
ipc::{Request, Response},
16+
AppHandle, State, Window,
2117
};
2218

2319
const READ_BUF_SIZE: usize = 1024;
2420

2521
#[command]
26-
pub async fn serial_connect(id: String, app_handle: AppHandle, state: State<'_, super::commands::ActiveConnection<'_>>) -> Result<bool, ()> {
27-
match tokio_serial::new(id, 9600).open_native_async() {
28-
Ok(mut port) => {
29-
#[cfg(unix)]
30-
port.set_exclusive(false)
31-
.expect("Unable to set serial port exclusive to false");
32-
33-
let (mut reader, mut writer) = tokio::io::split(port);
34-
35-
let (send, mut recv) = channel(5);
36-
*state.conn.lock().await = Some(Box::new(send));
37-
tauri::async_runtime::spawn(async move {
38-
while let Some(data) = recv.next().await {
39-
writer.write(&data).await;
40-
}
41-
});
42-
43-
tauri::async_runtime::spawn(async move {
44-
let mut buffer = vec![0; READ_BUF_SIZE];
45-
while let Ok(size) = reader.read(&mut buffer).await {
46-
if size > 0 {
47-
use tauri::Manager;
48-
app_handle.emit("connection_data", &buffer[..size]);
22+
pub async fn serial_connect(
23+
id: String,
24+
app_handle: AppHandle,
25+
state: State<'_, super::commands::ActiveConnection<'_>>,
26+
) -> Result<bool, ()> {
27+
match tokio_serial::new(id, 9600).open_native_async() {
28+
Ok(mut port) => {
29+
#[cfg(unix)]
30+
port.set_exclusive(false)
31+
.expect("Unable to set serial port exclusive to false");
32+
33+
let (mut reader, mut writer) = tokio::io::split(port);
34+
35+
let (send, mut recv) = channel(5);
36+
*state.conn.lock().await = Some(Box::new(send));
37+
tauri::async_runtime::spawn(async move {
38+
while let Some(data) = recv.next().await {
39+
writer.write(&data).await;
4940
}
50-
}
51-
});
52-
53-
Ok(true)
54-
},
55-
Err(_) => Err(())
56-
}
41+
});
42+
43+
tauri::async_runtime::spawn(async move {
44+
use tauri::Manager;
45+
46+
let mut buffer = vec![0; READ_BUF_SIZE];
47+
while let Ok(size) = reader.read(&mut buffer).await {
48+
if size > 0 {
49+
app_handle.emit("connection_data", &buffer[..size]);
50+
} else {
51+
break;
52+
}
53+
}
54+
55+
let state = app_handle.state::<super::commands::ActiveConnection>();
56+
*state.conn.lock().await = None;
57+
58+
app_handle.emit("connection_disconnected", ());
59+
});
60+
61+
Ok(true)
62+
}
63+
Err(_) => Err(()),
64+
}
5765
}
5866

5967
#[command]
60-
pub async fn serial_list_devices(
61-
) -> Result<Vec<super::commands::AvailableDevice>, ()> {
62-
let ports = unblock(|| available_ports()).await.unwrap();
68+
pub async fn serial_list_devices() -> Result<Vec<super::commands::AvailableDevice>, ()> {
69+
let ports = unblock(|| available_ports()).await.unwrap();
6370

64-
let candidates = ports.into_iter().filter_map(|pi| {println!("port {:?}", pi); match pi.port_type { SerialPortType::UsbPort(u) => Some(super::commands::AvailableDevice {id: pi.port_name, label: u.product.unwrap_or("TODO".to_string())}), _ => None }}).collect();
71+
let candidates = ports
72+
.into_iter()
73+
.filter_map(|pi| {
74+
println!("port {:?}", pi);
75+
match pi.port_type {
76+
SerialPortType::UsbPort(u) => Some(super::commands::AvailableDevice {
77+
id: pi.port_name,
78+
label: u.product.unwrap_or("TODO".to_string()),
79+
}),
80+
_ => None,
81+
}
82+
})
83+
.collect();
6584

66-
println!("Candidates {:?}", candidates);
67-
Ok(candidates)
85+
println!("Candidates {:?}", candidates);
86+
Ok(candidates)
6887
}
69-

src/RpcTest.tsx

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { useState, Dispatch } from 'react';
44
import { create_rpc_connection, call_rpc, Request } from "ts-zmk-rpc-core";
55

66
import type { RpcTransport } from "ts-zmk-rpc-core/transport/index";
7+
import type { Notification } from "ts-zmk-rpc-core/transport/core";
78
import type { PhysicalLayout, Keymap } from "ts-zmk-rpc-core/keymap";
89
import type { GetBehaviorDetailsResponse } from "ts-zmk-rpc-core/behaviors";
910
import { connect as gatt_connect } from "ts-zmk-rpc-core/transport/gatt";
@@ -30,6 +31,26 @@ const TRANSPORTS: TransportFactory[] = [
3031
... window.__TAURI_INTERNALS__ ? [{ label: "Serial", pick_and_connect: { connect: tauri_serial_connect, list: serial_list_devices }}] : [],
3132
];
3233

34+
async function listen_for_notifications(notification_stream: ReadableStream<Notification>): Promise<void> {
35+
let reader = notification_stream.getReader();
36+
do {
37+
try {
38+
let { done, value } = await reader.read();
39+
if (done) {
40+
break;
41+
}
42+
43+
// TODO: Do something with the notifications
44+
console.log("Notification", value);
45+
} catch (e) {
46+
reader.releaseLock();
47+
throw e;
48+
}
49+
} while (true)
50+
51+
reader.releaseLock();
52+
}
53+
3354
async function test(factory: TransportFactory, setPhysicalLayout: Dispatch<PhysicalLayout | undefined>, setKeymap: Dispatch<Keymap | undefined>, setBehaviors: Dispatch<BehaviorMap>) {
3455
let transport = null;
3556
if (factory.connect) {
@@ -46,6 +67,11 @@ async function test(factory: TransportFactory, setPhysicalLayout: Dispatch<Physi
4667

4768
let rpc_conn = await create_rpc_connection(transport);
4869

70+
listen_for_notifications(rpc_conn.notification_readable).then(() => {
71+
console.log("NO MORE NOTPIFICATIONS");
72+
// TODO: Process disconnect here!
73+
});
74+
4975
// let req: Request = { core: { getLockState: true }, requestId: 0 };
5076
// let resp = await call_rpc(rpc_conn, req);
5177

src/tauri/ble.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,17 @@ export async function connect(dev: AvailableDevice): Promise<RpcTransport> {
2121

2222
let { writable: response_writable, readable } = new TransformStream();
2323

24-
/* const unlisten = */ await listen('connection_data', async (event: { payload: Array<number> }) => {
24+
const unlisten_data = await listen('connection_data', async (event: { payload: Array<number> }) => {
2525
let writer = response_writable.getWriter();
2626
await writer.write(new Uint8Array(event.payload));
2727
writer.releaseLock();
28-
})
28+
});
29+
30+
const unlisten_disconnected = await listen('connection_disconnected', async (event: any) => {
31+
unlisten_data();
32+
unlisten_disconnected();
33+
readable.cancel();
34+
});
2935

3036
return { readable, writable };
3137
}

src/tauri/serial.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,20 @@ export async function connect(dev: AvailableDevice): Promise<RpcTransport> {
2121

2222
let { writable: response_writable, readable } = new TransformStream();
2323

24-
/* const unlisten = */ await listen('connection_data', async (event: { payload: Array<number> }) => {
25-
console.log(event);
24+
console.log(response_writable);
25+
console.log(readable);
26+
27+
const unlisten_data = await listen('connection_data', async (event: { payload: Array<number> }) => {
2628
let writer = response_writable.getWriter();
2729
await writer.write(new Uint8Array(event.payload));
2830
writer.releaseLock();
29-
})
31+
});
32+
33+
const unlisten_disconnected = await listen('connection_disconnected', async (event: any) => {
34+
unlisten_data();
35+
unlisten_disconnected();
36+
response_writable.close();
37+
});
3038

3139
return { readable, writable };
3240
}

0 commit comments

Comments
 (0)