BLEDbusSession: implement multiple devices support and disconnection

Also implemented service search retry to stabilize the first read
operation. Next work is to create a class to gather device attirbutes
that BLEDBusSession refers.

Signed-off-by: Shin'ichiro Kawasaki <kawasaki@juno.dti.ne.jp>
This commit is contained in:
Shin'ichiro Kawasaki
2023-09-10 20:25:49 +09:00
parent 157e3458b0
commit adb746995e

View File

@@ -34,7 +34,7 @@ from sdbus_async.bluez.gatt_api import (
GattServiceInterfaceAsync,
)
from asyncio import sleep
from os import dup, fdopen
from os import dup, fdopen, close
import threading
import time
@@ -160,11 +160,17 @@ class Session():
break
await self._send_notifications()
logger.debug("in handle loop")
except websockets.ConnectionClosedError as e:
except (websockets.ConnectionClosedOK, websockets.ConnectionClosedError) as e:
logger.info("scratch closed session")
logger.debug(e)
self.close()
if type(self) is BLEDBusSession:
await self.async_close()
else:
self.close()
break
except Exception as e:
t = type(e)
logger.info(f"scratch closed with unkown exception {e}: {t}")
def close(self):
"""
@@ -188,7 +194,19 @@ class BLEDBusSession(Session):
MAX_SCANNER_IF = 3
found_devices = {}
connected_devices = {}
class Notification():
def __init__(self, loop, acquired_fd, fd, fp, params):
self.loop = loop
self.acquired_fd = acquired_fd
self.fd = fd
self.fp = fp
self.params = params
def close(self):
self.loop.remove_reader(self.fd)
self.fp.close()
def _connect_to_adapters(self):
self.iface = None
@@ -218,9 +236,9 @@ class BLEDBusSession(Session):
await self.adapter.start_discovery()
self.discovery_running = True
logger.debug(f"Task to stop discovery has got created.")
asyncio.create_task(self._find_devices())
asyncio.create_task(self._stop_discovery())
logger.debug(f"Task to stop discovery has got created.")
async def _matches(self, dev, filters):
"""
@@ -258,9 +276,23 @@ class BLEDBusSession(Session):
# ref: https://github.com/LLK/scratch-link/blob/develop/Documentation/BluetoothLE.md
return False
async def _notify_device(self, device, node_name) -> None:
params = { 'rssi': -80, 'name': 'Unknown' }
try:
params['rssi'] = await device.rssi
except Exception:
None
try:
params['name'] = await device.name
except Exception:
None
params['peripheralId'] = node_name
await self._send_notification('didDiscoverPeripheral', params)
async def _find_devices(self) -> None:
assert self.discovery_running
while self.discovery_running:
logger.info("in _find_devices: wait 1 second")
await sleep(1)
s = await self.adapter_introspect.dbus_introspect()
parser = ET.fromstring(s)
@@ -273,25 +305,20 @@ class BLEDBusSession(Session):
node_name = node.attrib['name']
logger.debug(f" {node_name}")
devpath = self.iface + "/" + node_name
if BLEDBusSession.connected_devices.get(devpath):
continue
device = DeviceInterfaceAsync()
device._connect('org.bluez', devpath, bus=self.dbus)
if not await self._matches(device, self.discover_filters):
continue
if BLEDBusSession.found_devices.get(node_name):
if self.found_devices.get(node_name):
continue
BLEDBusSession.found_devices[node_name] = device
params = { 'rssi': -80, 'name': 'Unknown' }
try:
params['rssi'] = await device.rssi
except Exception:
None
try:
params['name'] = await device.name
except Exception:
None
params['peripheralId'] = node_name
self.found_devices[node_name] = device
self.devpath = devpath
await self._send_notification('didDiscoverPeripheral', params)
self.devname = await device.name
self.devaddr = await device.address
await self._notify_device(device, node_name)
logger.debug("end _find_device.")
async def _stop_discovery(self) -> None:
@@ -310,12 +337,14 @@ class BLEDBusSession(Session):
self.discovery_running = False
self.iface = None
self.devpath = None
self.devname = None
self.devaddr = None
self.services = {}
self.chars = {}
self.chars_cache = {}
self.notification_fps = {}
self.notification_params = {}
self.notifications = {}
self._connect_to_adapters()
self.found_devices = {}
async def _get_characteristics(self, service_path):
service_introspect = DbusInterfaceCommonAsync()
@@ -339,13 +368,18 @@ class BLEDBusSession(Session):
async def _get_services(self):
# do D-Bus introspect to the device path and get service paths under it
dev_introspect = DbusInterfaceCommonAsync()
dev_introspect._connect('org.bluez', self.devpath, bus=self.dbus)
s = await dev_introspect.dbus_introspect()
parser = ET.fromstring(s)
nodes = parser.findall("./node")
for i in range(5):
dev_introspect = DbusInterfaceCommonAsync()
dev_introspect._connect('org.bluez', self.devpath, bus=self.dbus)
s = await dev_introspect.dbus_introspect()
parser = ET.fromstring(s)
nodes = parser.findall("./node")
if nodes:
break
else:
logger.error("Service not found. Try again.")
await sleep(1)
if not nodes:
logger.error("service not found")
return []
for node in nodes:
path = self.devpath + '/' + node.attrib['name']
@@ -378,21 +412,31 @@ class BLEDBusSession(Session):
async def _start_notification(self, sid, cid, char):
logger.debug('startNotification')
(fd, mtu) = await char.acquire_notify({})
self.fd = dup(fd)
fp = fdopen(self.fd, mode='rb', buffering=0, newline=None)
self.notification_fps[self.fd] = fp
self.notification_params[self.fd] = { 'serviceId': sid,
'characteristicId': cid,
'encoding': 'base64' }
self.loop.add_reader(self.fd, self._read_notification, self.fd)
logger.debug(f'added notification reader: fd={self.fd}')
(acquired_fd, mtu) = await char.acquire_notify({})
fd = dup(acquired_fd)
fp = fdopen(fd, mode='rb', buffering=0, newline=None)
self.loop.add_reader(fd, self._read_notification, fd)
notification = self.Notification(self.loop, acquired_fd, fd, fp, {
'serviceId': sid,
'characteristicId': cid,
'encoding': 'base64'
})
self.notifications[fd] = notification
logger.debug(f'added notification reader: {notification}')
def _stop_notifications(self):
for n in self.notifications.values():
n.close()
def _read_notification(self, *args):
fd = args[0]
fp = self.notification_fps[fd]
data = fp.read()
params = self.notification_params[fd].copy()
notification = self.notifications[fd]
data = notification.fp.read()
if len(data) == 0:
logger.debug(f'empty notification data')
asyncio.create_task(self.async_close())
return
params = notification.params.copy()
params['message'] = base64.standard_b64encode(data).decode('ascii')
self.loop.create_task(self._send_notification('characteristicDidChange', params))
@@ -419,13 +463,14 @@ class BLEDBusSession(Session):
elif self.status == self.DISCOVERY and method == 'connect':
logger.debug("connecting to the BLE device")
self.device = BLEDBusSession.found_devices[params['peripheralId']]
self.device = self.found_devices[params['peripheralId']]
try:
logger.debug(f" {self.device}")
await self.device.connect()
res["result"] = None
self.status = self.CONNECTED
logger.info("Connected")
logger.info(f"Connected: '{self.devname}'@{self.devaddr}")
BLEDBusSession.connected_devices[self.devpath] = self.device
except NotImplementedError as e:
logger.error(e)
res["error"] = { "message": "Failed to connect to device" }
@@ -467,6 +512,21 @@ class BLEDBusSession(Session):
logger.debug("end request")
return False
async def async_close(self):
if not self.device:
return
logger.info(f"Disconnecting from '{self.devname}'@{self.devaddr}")
self._stop_notifications()
await self.device.disconnect()
BLEDBusSession.connected_devices.pop(self.devpath)
logger.info(f"Disconnected from '{self.devname}'@{self.devaddr}")
self.device = None
self.devpath = None
self.devname = None
self.devaddr = None
await self.websocket.close()
return
def close(self):
logger.debug("close")
return