Add Lego Mindstorms functionality

This commit is contained in:
Chris Glencross
2020-02-24 19:48:12 +00:00
parent 2240de0085
commit 6b7619ebeb
3 changed files with 261 additions and 24 deletions

View File

@@ -8,13 +8,15 @@ only on Windows and MacOS, and cannot connect Scratch and micro:bit on Linux.
Bluepy-scratch-link allows Linux PCs to connect Scratch and micro:bit. It uses
Linux Bluetooth protocol stack [Bluez](http://www.bluez.org/) and its python
interface [bluepy](https://github.com/IanHarvey/bluepy) to handle Bluetooth Low
interfaces [pybluez](https://github.com/pybluez/pybluez) to handle Bluetooth,
and [bluepy](https://github.com/IanHarvey/bluepy) to handle Bluetooth Low
Energy, or BLE, connections with micro:bit. It is confirmed that
bluepy-scratch-link connects Scratch 3.0 and a micro:bit.
bluepy-scratch-link connects Scratch 3.0 and a micro:bit, and a Lego Mindstorms
EV3.
This is a minimal implementation to support micro:bit. Some of Scratch-link
features are not implemented. For example, Bluetooth (non-BLE) devices are not
supported. BLE device support other than micro:bit is not confirmed.
This is a minimal implementation to support micro:bit and Lego Mindstorms EV3.
It may work with other devices but these are untested. Some Scratch-link
features are not implemented.
Bluepy-scratch-link requires python version 3.6 and later to use websockets.
If your system has python older than version 3.6, install newer version. If your
@@ -27,7 +29,7 @@ feed-backs will be appreciated.
Installation
------------
1. Prepare BLE controller
1. Prepare Bluetooth/BLE controller
Confirm that your Linux PC has a Bluetooth controller with BLE support.
Bluetooth 4.0 controller supports BLE. If your PC does not have it, need
to plug USB Bluetooth 4.0 adapter.
@@ -44,14 +46,14 @@ Installation
```sh
$ sudo pip install bluepy websockets
Or if your system has python3 command,
$ sudo pip3 install bluepy websockets
$ sudo pip3 install bluepy pybluez websockets
```
4. Get bluepy-scratch-link
Example below installs bluepy-scratch-link under your home directory.
```sh
$ cd ~
$ git clone git@github.com:kawasaki/bluepy-scratch-link.git
$ git clone git@github.com:chrisglencross/bluepy-scratch-link.git
```
5. Prepare web server certificate
@@ -71,7 +73,7 @@ Installation
| tr -d '\r' > scratch-device-manager.pem
```
6. Install Scratch-link hex in micro:bit
6. If using a micro:bit, install Scratch-link hex on your device
* Download and unzip the [micro:bit Scratch Hex file](https://downloads.scratch.mit.edu/microbit/scratch-microbit-1.1.0.hex.zip).
* Flash the micro:bit over USB with the Scratch .Hex File, you will see the
five character name of the micro:bit scroll across the screen such as
@@ -79,13 +81,56 @@ Installation
Usage
-----
1. Turn on Bluetooth Low Energy controller
1. For micro:bit or other BLE devices, turn on Bluetooth Low Energy controller
```sh
$ sudo btmgmt le on
$ sudo btmgmt power on
```
2. Start scratch-link python script
2. For Lego Mindstorms EV3, pair your Linux PC to the EV3 brick.
First, turn on the EV3 and ensure Bluetooth is enabled.
Then, pair using your Linux desktop's the Bluetooth settings, for example with Gnome:
* Settings -> Bluetooth
* Click on the EV3 device name
* Accept the connection on EV3 brick
* Enter a matching PIN on EV3 brick and Linux PC
* Confirm EV3 status is "Disconnected" in Bluetooth settings
Alternatively you can perform pairing from the command-line:
```shell script
$ bluetoothctl
[bluetooth]# power on
Changing power on succeeded
[bluetooth]# pairable on
Changing pairable on succeeded
[bluetooth]# agent KeyboardOnly
Agent registered
[bluetooth]# devices
...
Device 00:16:53:53:D3:19 EV3
...
[bluetooth]# pair 00:16:53:53:D3:19
Attempting to pair with 00:16:53:53:D3:19
# Confirm pairing on the EV3 display, set PIN to 1234
Request PIN code
[agent] Enter PIN code: 1234
[CHG] Device 00:16:53:53:D3:19 Connected: yes
[CHG] Device 00:16:53:53:D3:19 Paired: yes
Pairing successful
[bluetooth]# quit
```
3. Start scratch-link python script
```sh
$ cd ~/bluepy-scratch-link
$ sudo ./scratch_link.py
@@ -93,7 +138,7 @@ Usage
$ sudo python3 ./scratch_link.py
```
3. Start Firefox or Chrome and allow local server certificate
4. Start Firefox or Chrome and allow local server certificate
* This action is required only the first time to access.
* Open Firefox or Chrome and open [https://device-manager.scratch.mit.edu:20110/](https://device-manager.scratch.mit.edu:20110/). You will see a security risk warning.
* In **Firefox**: Click "Advanced" and click "Accept Risk and Continue".
@@ -101,8 +146,8 @@ Usage
* Immediately, you will see "Failed to open a WebSocket connection". This is expected.
4. Connect scratch to micro:bit
5. Connect scratch to micro:bit or Lego Mindstorms:
* Open [Scratch 3.0](https://scratch.mit.edu/)
* Select the "Add Extension" button
* Select micro:bit extension and follow the prompts to connect micro:bit
* Select micro:bit or Lego Mindstorms EV3 extension and follow the prompts to connect
* Build your project with the extension blocks

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
websockets
bluepy
pybluez

View File

@@ -1,4 +1,6 @@
#!/usr/bin/env python
import select
import struct
"""Scratch link on bluepy"""
@@ -9,9 +11,13 @@ import websockets
import json
import base64
# for BLESession
# for Bluetooth (e.g. Lego EV3)
import bluetooth
# for BLESession (e.g. BBC micro:bit)
from bluepy.btle import Scanner, UUID, Peripheral, DefaultDelegate
from bluepy.btle import BTLEDisconnectError
import threading
import time
@@ -92,16 +98,199 @@ class Session():
logger.debug("in handle loop")
class BTSession(Session):
"""Manage a session for Bluetooh device"""
def __init__(self, websocket):
super().__init__(websocekt)
"""Manage a session for Bluetooth device"""
INITIAL = 1
DISCOVERY = 2
DISCOVERY_COMPLETE = 3
CONNECTED = 4
DONE = 5
# Split this into discovery thread and communication thread
# discovery thread should auto-terminate
class BTThread(threading.Thread):
"""
Separated thread to control notifications to Scratch.
It handles device discovery notification in DISCOVERY status
and notifications from bluetooth devices in CONNECTED status.
"""
class BTDiscoverer(bluetooth.DeviceDiscoverer):
def __init__(self, major_class, minor_class):
super().__init__()
self.major_class = major_class
self.minor_class = minor_class
self.found_devices = {}
self.done = False
def pre_inquiry(self):
self.done = False
def device_discovered(self, address, device_class, rssi, name):
logger.debug(f"Found device {name} addr={address} class={device_class} rssi={rssi}")
major_class = (device_class & 0x1F00) >> 8
minor_class = (device_class & 0xFF) >> 2
if major_class == self.major_class and minor_class == self.minor_class:
self.found_devices[address] = (name, device_class, rssi)
def inquiry_complete(self):
self.done = True
def __init__(self, session, major_device_class, minor_device_class):
threading.Thread.__init__(self)
self.session = session
self.major_device_class = major_device_class
self.minor_device_class = minor_device_class
self.cancel_discovery = False
self.ping_time = None
def discover(self):
discoverer = self.BTDiscoverer(self.major_device_class, self.minor_device_class)
discoverer.find_devices(lookup_names=True)
while self.session.status == self.session.DISCOVERY and not discoverer.done and not self.cancel_discovery:
readable = select.select([discoverer], [], [], 0.5)[0]
if discoverer in readable:
discoverer.process_event()
for addr, (device_name, device_class, rssi) in discoverer.found_devices.items():
logger.debug(f"notifying discovered {addr}: {device_name}")
params = {"rssi": rssi, 'peripheralId': addr, 'name': device_name.decode("utf-8")}
self.session.notify('didDiscoverPeripheral', params)
discoverer.found_devices.clear()
if not discoverer.done:
discoverer.cancel_inquiry()
def run(self):
while self.session.status != self.session.DONE:
logger.debug("loop in BT thread")
current_time = int(round(time.time()))
if self.session.status == self.session.DISCOVERY and not self.cancel_discovery:
logger.debug("in discovery status:")
try:
self.discover()
self.ping_time = current_time + 5
finally:
self.session.status = self.session.DISCOVERY_COMPLETE
elif self.session.status == self.session.CONNECTED:
logger.debug("in connected status:")
sock = self.session.sock
try:
ready = select.select([sock], [], [], 1)
if ready[0]:
header = sock.recv(2)
[msg_len] = struct.unpack("<H", header)
msg_data = sock.recv(msg_len)
data = header + msg_data
params = {'message': base64.standard_b64encode(data).decode('utf-8'), "encoding": "base64"}
self.session.notify('didReceiveMessage', params)
self.ping_time = current_time + 5
except Exception as e:
logger.error(e)
self.session.close()
break
# To avoid repeated lock by this single thread,
# yield CPU to other lock waiting threads.
time.sleep(0)
else:
# Nothing to do:
time.sleep(1)
# Terminate if we have lost websocket connection to Scratch (e.g. browser closed)
if self.ping_time is None or self.ping_time <= current_time:
try:
self.session.notify('ping', {})
self.ping_time = current_time + 5
except Exception as e:
logger.error(e)
self.session.close()
break
def __init__(self, websocket, loop):
super().__init__(websocket, loop)
self.status = self.INITIAL
self.sock = None
self.bt_thread = None
def close(self):
self.status = self.DONE
if self.sock:
logger.info(f"disconnect to BT socket: {self.sock}")
self.sock.close()
def __del__(self):
self.close()
def handle_request(self, method, params):
"""Handle requests from Scratch"""
logger.debug("handle request to BT device")
logger.debug(method)
if len(params) > 0:
logger.debug(params)
res = { "jsonrpc": "2.0" }
if self.status == self.INITIAL and method == 'discover':
logger.debug("Starting async discovery")
self.status = self.DISCOVERY
self.bt_thread = self.BTThread(self, params["majorDeviceClass"], params["minorDeviceClass"])
self.bt_thread.start()
res["result"] = None
elif self.status in [self.DISCOVERY, self.DISCOVERY_COMPLETE] and method == 'connect':
# Cancel discovery
while self.status == self.DISCOVERY:
logger.debug("Cancelling discovery")
self.bt_thread.cancel_discovery = True
time.sleep(1)
addr = params['peripheralId']
logger.debug(f"connecting to the BT device {addr}")
try:
self.sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self.sock.connect((addr, 1))
logger.info(f"connected to BT device: {addr}")
except bluetooth.BluetoothError as e:
logger.error(f"failed to connect to BT device: {e}", exc_info=e)
self.status = self.DONE
self.sock = None
if self.sock:
res["result"] = None
self.status = self.CONNECTED
else:
err_msg = f"BT connect failed: {addr}"
res["error"] = { "message": err_msg }
self.status = self.DONE
elif self.status == self.CONNECTED and method == 'send':
logger.debug("handle send request")
if params['encoding'] != 'base64':
logger.error("encoding other than base 64 is not "
"yet supported: ", params['encoding'])
msg_bstr = params['message'].encode('ascii')
data = base64.standard_b64decode(msg_bstr)
self.sock.send(data)
res['result'] = len(data)
logger.debug(res)
return res
def end_request(self):
logger.debug(f"end_request of BTSession {self}")
return self.status == self.DONE
def handle(self):
logger.error("BT session handler is not implemented")
class BLESession(Session):
"""
Manage a session for Bluetooh Low Energy device such as micro:bit
Manage a session for Bluetooth Low Energy device such as micro:bit
"""
INITIAL = 1
@@ -346,13 +535,13 @@ sessionTypes = { '/scratch/ble': BLESession, '/scratch/bt': BTSession }
async def ws_handler(websocket, path):
try:
logger.info(f"Start session for web socket path: {path}");
logger.info(f"Start session for web socket path: {path}")
loop = asyncio.get_event_loop()
session = sessionTypes[path](websocket, loop)
await session.handle()
except Exception as e:
logger.error(f"Failure in session for web socket path: {path}");
logger.error(e);
logger.error(f"Failure in session for web socket path: {path}")
logger.error(e)
start_server = websockets.serve(
ws_handler, "device-manager.scratch.mit.edu", 20110, ssl=ssl_context