From 22d5b04d20244178178a87085cc1cec7e1357978 Mon Sep 17 00:00:00 2001 From: Shin'ichiro Kawasaki Date: Sun, 7 Jun 2020 11:28:28 +0900 Subject: [PATCH] Session, BLESession: Avoid websocket send job from BLE thread During trials to make Lego Boost work with bluepy-scratch-link, deadlock race was observed between the lock for write to BLE device and waitForNotifcations to BLE device. The cause of the dead lock is coroutine call by BLE delegate handler to send out notifications to Scratch through websocket conflict with websocket receive call by websocket loop thread. To avoid the conflict, only allow websocket loop thread to send out through websocket. Have BLE thread to queue notifications so that it does not need to touch websocket. On the other hand, introduce timeout when websocket loop thread receives message from Scratch. This allow the websocket loop thread to check the notifications queued. If any notification is queued, the websocket loop thread sends it to Scratch. Also fix notify() function argument signature, which was broken by commit ba8eba4 ("BLESession: Introduce notification queue"). Signed-off-by: Shin'ichiro Kawasaki --- scratch_link.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/scratch_link.py b/scratch_link.py index cfe1015..5c6361a 100755 --- a/scratch_link.py +++ b/scratch_link.py @@ -61,12 +61,15 @@ class Session(): Return True when the sessino should end. """ logger.debug("start recv_request") - req = await self.websocket.recv() + try: + req = await asyncio.wait_for(self.websocket.recv(), 0.0001) + except asyncio.TimeoutError: + return False logger.debug(f"request: {req}") jsonreq = json.loads(req) if jsonreq['jsonrpc'] != '2.0': - logger.error("error: jsonrpc versino is not 2.0") - return + logger.error("error: jsonrpc version is not 2.0") + return True jsonres = self.handle_request(jsonreq['method'], jsonreq['params']) if 'id' in jsonreq: jsonres['id'] = jsonreq['id'] @@ -90,7 +93,10 @@ class Session(): logger.debug("default end_request") return False - def notify(self): + def notify(self, key, params): + self.notification_queue.put((key, params)) + + async def _send_notifications(self): """ Notify BT/BLE device events to scratch. """ @@ -98,17 +104,14 @@ class Session(): # flush notification queue while not self.notification_queue.empty(): method, params = self.notification_queue.get() - self._send_notification(method, params) + await self._send_notification(method, params) - def _send_notification(self, method, params): + async def _send_notification(self, method, params): jsonn = { 'jsonrpc': "2.0", 'method': method } jsonn['params'] = params notification = json.dumps(jsonn) logger.debug(f"notification: {notification}") - - future = asyncio.run_coroutine_threadsafe( - self.websocket.send(notification), self.loop) - result = future.result() + await self.websocket.send(notification) async def handle(self): logger.debug("start session hanlder") @@ -117,6 +120,7 @@ class Session(): while True: if await self.recv_request(): break + await self._send_notifications() logger.debug("in handle loop") class BTSession(Session): @@ -343,8 +347,7 @@ class BLESession(Session): params = { 'rssi': d.rssi } params['peripheralId'] = devices.index(d) params['name'] = d.getValueText(0x9) - self.session.notification_queue.put(('didDiscoverPeripheral', params)) - self.session.notify() + self.session.notify('didDiscoverPeripheral', params) time.sleep(1) elif self.session.status == self.session.CONNECTED: logger.debug("in connected status:") @@ -370,7 +373,7 @@ class BLESession(Session): time.sleep(0) else: # Nothing to do: - time.sleep(1) + time.sleep(0) class BLEDelegate(DefaultDelegate): """ @@ -394,10 +397,7 @@ class BLESession(Session): logger.debug(f"BLE notification: {handle} {data}") params = self.handles[handle].copy() params['message'] = base64.standard_b64encode(data).decode('ascii') - self.session.notification_queue.put(('characteristicDidChange', params)) - if not self.restart_notification_event.is_set(): - return - self.session.notify() + self.session.notify('characteristicDidChange', params) def __init__(self, websocket, loop): super().__init__(websocket, loop)