diff --git a/pyscrlink/scratch_link.py b/pyscrlink/scratch_link.py index 8e580f3..4b662bd 100755 --- a/pyscrlink/scratch_link.py +++ b/pyscrlink/scratch_link.py @@ -34,7 +34,7 @@ from sdbus_async.bluez.gatt_api import ( GattServiceInterfaceAsync, ) from asyncio import sleep -from os import dup, fdopen +from os import dup, fdopen, close import threading import time @@ -160,11 +160,17 @@ class Session(): break await self._send_notifications() logger.debug("in handle loop") - except websockets.ConnectionClosedError as e: + except (websockets.ConnectionClosedOK, websockets.ConnectionClosedError) as e: logger.info("scratch closed session") logger.debug(e) - self.close() + if type(self) is BLEDBusSession: + await self.async_close() + else: + self.close() break + except Exception as e: + t = type(e) + logger.info(f"scratch closed with unkown exception {e}: {t}") def close(self): """ @@ -188,7 +194,19 @@ class BLEDBusSession(Session): MAX_SCANNER_IF = 3 - found_devices = {} + connected_devices = {} + + class Notification(): + def __init__(self, loop, acquired_fd, fd, fp, params): + self.loop = loop + self.acquired_fd = acquired_fd + self.fd = fd + self.fp = fp + self.params = params + + def close(self): + self.loop.remove_reader(self.fd) + self.fp.close() def _connect_to_adapters(self): self.iface = None @@ -218,9 +236,9 @@ class BLEDBusSession(Session): await self.adapter.start_discovery() self.discovery_running = True - logger.debug(f"Task to stop discovery has got created.") asyncio.create_task(self._find_devices()) asyncio.create_task(self._stop_discovery()) + logger.debug(f"Task to stop discovery has got created.") async def _matches(self, dev, filters): """ @@ -258,9 +276,23 @@ class BLEDBusSession(Session): # ref: https://github.com/LLK/scratch-link/blob/develop/Documentation/BluetoothLE.md return False + async def _notify_device(self, device, node_name) -> None: + params = { 'rssi': -80, 'name': 'Unknown' } + try: + params['rssi'] = await device.rssi + except Exception: + None + try: + params['name'] = await device.name + except Exception: + None + params['peripheralId'] = node_name + await self._send_notification('didDiscoverPeripheral', params) + async def _find_devices(self) -> None: assert self.discovery_running while self.discovery_running: + logger.info("in _find_devices: wait 1 second") await sleep(1) s = await self.adapter_introspect.dbus_introspect() parser = ET.fromstring(s) @@ -273,25 +305,20 @@ class BLEDBusSession(Session): node_name = node.attrib['name'] logger.debug(f" {node_name}") devpath = self.iface + "/" + node_name + if BLEDBusSession.connected_devices.get(devpath): + continue device = DeviceInterfaceAsync() device._connect('org.bluez', devpath, bus=self.dbus) if not await self._matches(device, self.discover_filters): continue - if BLEDBusSession.found_devices.get(node_name): + if self.found_devices.get(node_name): continue - BLEDBusSession.found_devices[node_name] = device - params = { 'rssi': -80, 'name': 'Unknown' } - try: - params['rssi'] = await device.rssi - except Exception: - None - try: - params['name'] = await device.name - except Exception: - None - params['peripheralId'] = node_name + self.found_devices[node_name] = device self.devpath = devpath - await self._send_notification('didDiscoverPeripheral', params) + self.devname = await device.name + self.devaddr = await device.address + await self._notify_device(device, node_name) + logger.debug("end _find_device.") async def _stop_discovery(self) -> None: @@ -310,12 +337,14 @@ class BLEDBusSession(Session): self.discovery_running = False self.iface = None self.devpath = None + self.devname = None + self.devaddr = None self.services = {} self.chars = {} self.chars_cache = {} - self.notification_fps = {} - self.notification_params = {} + self.notifications = {} self._connect_to_adapters() + self.found_devices = {} async def _get_characteristics(self, service_path): service_introspect = DbusInterfaceCommonAsync() @@ -339,13 +368,18 @@ class BLEDBusSession(Session): async def _get_services(self): # do D-Bus introspect to the device path and get service paths under it - dev_introspect = DbusInterfaceCommonAsync() - dev_introspect._connect('org.bluez', self.devpath, bus=self.dbus) - s = await dev_introspect.dbus_introspect() - parser = ET.fromstring(s) - nodes = parser.findall("./node") + for i in range(5): + dev_introspect = DbusInterfaceCommonAsync() + dev_introspect._connect('org.bluez', self.devpath, bus=self.dbus) + s = await dev_introspect.dbus_introspect() + parser = ET.fromstring(s) + nodes = parser.findall("./node") + if nodes: + break + else: + logger.error("Service not found. Try again.") + await sleep(1) if not nodes: - logger.error("service not found") return [] for node in nodes: path = self.devpath + '/' + node.attrib['name'] @@ -378,21 +412,31 @@ class BLEDBusSession(Session): async def _start_notification(self, sid, cid, char): logger.debug('startNotification') - (fd, mtu) = await char.acquire_notify({}) - self.fd = dup(fd) - fp = fdopen(self.fd, mode='rb', buffering=0, newline=None) - self.notification_fps[self.fd] = fp - self.notification_params[self.fd] = { 'serviceId': sid, - 'characteristicId': cid, - 'encoding': 'base64' } - self.loop.add_reader(self.fd, self._read_notification, self.fd) - logger.debug(f'added notification reader: fd={self.fd}') + (acquired_fd, mtu) = await char.acquire_notify({}) + fd = dup(acquired_fd) + fp = fdopen(fd, mode='rb', buffering=0, newline=None) + self.loop.add_reader(fd, self._read_notification, fd) + notification = self.Notification(self.loop, acquired_fd, fd, fp, { + 'serviceId': sid, + 'characteristicId': cid, + 'encoding': 'base64' + }) + self.notifications[fd] = notification + logger.debug(f'added notification reader: {notification}') + + def _stop_notifications(self): + for n in self.notifications.values(): + n.close() def _read_notification(self, *args): fd = args[0] - fp = self.notification_fps[fd] - data = fp.read() - params = self.notification_params[fd].copy() + notification = self.notifications[fd] + data = notification.fp.read() + if len(data) == 0: + logger.debug(f'empty notification data') + asyncio.create_task(self.async_close()) + return + params = notification.params.copy() params['message'] = base64.standard_b64encode(data).decode('ascii') self.loop.create_task(self._send_notification('characteristicDidChange', params)) @@ -419,13 +463,14 @@ class BLEDBusSession(Session): elif self.status == self.DISCOVERY and method == 'connect': logger.debug("connecting to the BLE device") - self.device = BLEDBusSession.found_devices[params['peripheralId']] + self.device = self.found_devices[params['peripheralId']] try: logger.debug(f" {self.device}") await self.device.connect() res["result"] = None self.status = self.CONNECTED - logger.info("Connected") + logger.info(f"Connected: '{self.devname}'@{self.devaddr}") + BLEDBusSession.connected_devices[self.devpath] = self.device except NotImplementedError as e: logger.error(e) res["error"] = { "message": "Failed to connect to device" } @@ -467,6 +512,21 @@ class BLEDBusSession(Session): logger.debug("end request") return False + async def async_close(self): + if not self.device: + return + logger.info(f"Disconnecting from '{self.devname}'@{self.devaddr}") + self._stop_notifications() + await self.device.disconnect() + BLEDBusSession.connected_devices.pop(self.devpath) + logger.info(f"Disconnected from '{self.devname}'@{self.devaddr}") + self.device = None + self.devpath = None + self.devname = None + self.devaddr = None + await self.websocket.close() + return + def close(self): logger.debug("close") return