tools/mpremote: Add romfs query, build and deploy commands.

These commands use the `vfs.rom_ioctl()` function to manage the ROM
partitions on a device, and create and deploy ROMFS images.

Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
Damien George
2022-03-04 10:57:46 +11:00
parent 840b641024
commit 0c98c60b68
4 changed files with 385 additions and 1 deletions

View File

@@ -78,6 +78,7 @@ The full list of supported commands are:
- `mip <mpremote_command_mip>`
- `mount <mpremote_command_mount>`
- `unmount <mpremote_command_unmount>`
- `romfs <mpremote_command_romfs>`
- `rtc <mpremote_command_rtc>`
- `sleep <mpremote_command_sleep>`
- `reset <mpremote_command_reset>`
@@ -347,6 +348,29 @@ The full list of supported commands are:
This happens automatically when ``mpremote`` terminates, but it can be used
in a sequence to unmount an earlier mount before subsequent command are run.
.. _mpremote_command_romfs:
- **romfs** -- manage ROMFS partitions on the device:
.. code-block:: bash
$ mpremote romfs <sub-command>
``<sub-command>`` may be:
- ``romfs query`` to list all the available ROMFS partitions and their size
- ``romfs [-o <output>] build <source>`` to create a ROMFS image from the given
source directory; the default output file is the source appended by ``.romfs``
- ``romfs [-p <partition>] deploy <source>`` to deploy a ROMFS image to the device;
will also create a temporary ROMFS image if the source is a directory
The ``build`` and ``deploy`` sub-commands both support the ``-m``/``--mpy`` option
to automatically compile ``.py`` files to ``.mpy`` when creating the ROMFS image.
This option is enabled by default, but only works if the ``mpy_cross`` Python
package has been installed (eg via ``pip install mpy_cross``). If the package is
not installed then a warning is printed and ``.py`` files remain as is. Compiling
of ``.py`` files can be disabled with the ``--no-mpy`` option.
.. _mpremote_command_rtc:
- **rtc** -- set/get the device clock (RTC):

View File

@@ -1,12 +1,15 @@
import binascii
import hashlib
import os
import sys
import tempfile
import zlib
import serial.tools.list_ports
from .transport import TransportError, stdout_write_bytes
from .transport import TransportError, TransportExecError, stdout_write_bytes
from .transport_serial import SerialTransport
from .romfs import make_romfs
class CommandError(Exception):
@@ -478,3 +481,181 @@ def do_rtc(state, args):
state.transport.exec("machine.RTC().datetime({})".format(timetuple))
else:
print(state.transport.eval("machine.RTC().datetime()"))
def _do_romfs_query(state, args):
state.ensure_raw_repl()
state.did_action()
# Detect the romfs and get its associated device.
state.transport.exec("import vfs")
if not state.transport.eval("hasattr(vfs,'rom_ioctl')"):
print("ROMFS is not enabled on this device")
return
num_rom_partitions = state.transport.eval("vfs.rom_ioctl(1)")
if num_rom_partitions <= 0:
print("No ROMFS partitions available")
return
for rom_id in range(num_rom_partitions):
state.transport.exec(f"dev=vfs.rom_ioctl(2,{rom_id})")
has_object = state.transport.eval("hasattr(dev,'ioctl')")
if has_object:
rom_block_count = state.transport.eval("dev.ioctl(4,0)")
rom_block_size = state.transport.eval("dev.ioctl(5,0)")
rom_size = rom_block_count * rom_block_size
print(
f"ROMFS{rom_id} partition has size {rom_size} bytes ({rom_block_count} blocks of {rom_block_size} bytes each)"
)
else:
rom_size = state.transport.eval("len(dev)")
print(f"ROMFS{rom_id} partition has size {rom_size} bytes")
romfs = state.transport.eval("bytes(memoryview(dev)[:12])")
print(f" Raw contents: {romfs.hex(':')} ...")
if not romfs.startswith(b"\xd2\xcd\x31"):
print(" Not a valid ROMFS")
else:
size = 0
for value in romfs[3:]:
size = (size << 7) | (value & 0x7F)
if not value & 0x80:
break
print(f" ROMFS image size: {size}")
def _do_romfs_build(state, args):
state.did_action()
if args.path is None:
raise CommandError("romfs build: source path not given")
input_directory = args.path
if args.output is None:
output_file = input_directory + ".romfs"
else:
output_file = args.output
romfs = make_romfs(input_directory, mpy_cross=args.mpy)
print(f"Writing {len(romfs)} bytes to output file {output_file}")
with open(output_file, "wb") as f:
f.write(romfs)
def _do_romfs_deploy(state, args):
state.ensure_raw_repl()
state.did_action()
transport = state.transport
if args.path is None:
raise CommandError("romfs deploy: source path not given")
rom_id = args.partition
romfs_filename = args.path
# Read in or create the ROMFS filesystem image.
if romfs_filename.endswith(".romfs"):
with open(romfs_filename, "rb") as f:
romfs = f.read()
else:
romfs = make_romfs(romfs_filename, mpy_cross=args.mpy)
print(f"Image size is {len(romfs)} bytes")
# Detect the ROMFS partition and get its associated device.
state.transport.exec("import vfs")
if not state.transport.eval("hasattr(vfs,'rom_ioctl')"):
raise CommandError("ROMFS is not enabled on this device")
transport.exec(f"dev=vfs.rom_ioctl(2,{rom_id})")
if transport.eval("isinstance(dev,int) and dev<0"):
raise CommandError(f"ROMFS{rom_id} partition not found on device")
has_object = transport.eval("hasattr(dev,'ioctl')")
if has_object:
rom_block_count = transport.eval("dev.ioctl(4,0)")
rom_block_size = transport.eval("dev.ioctl(5,0)")
rom_size = rom_block_count * rom_block_size
print(
f"ROMFS{rom_id} partition has size {rom_size} bytes ({rom_block_count} blocks of {rom_block_size} bytes each)"
)
else:
rom_size = transport.eval("len(dev)")
print(f"ROMFS{rom_id} partition has size {rom_size} bytes")
# Check if ROMFS filesystem image will fit in the target partition.
if len(romfs) > rom_size:
print("ROMFS image is too big for the target partition")
sys.exit(1)
# Prepare ROMFS partition for writing.
print(f"Preparing ROMFS{rom_id} partition for writing")
transport.exec("import vfs\ntry:\n vfs.umount('/rom')\nexcept:\n pass")
chunk_size = 4096
if has_object:
for offset in range(0, len(romfs), rom_block_size):
transport.exec(f"dev.ioctl(6,{offset // rom_block_size})")
chunk_size = min(chunk_size, rom_block_size)
else:
rom_min_write = transport.eval(f"vfs.rom_ioctl(3,{rom_id},{len(romfs)})")
chunk_size = max(chunk_size, rom_min_write)
# Detect capabilities of the device to use the fastest method of transfer.
has_bytes_fromhex = transport.eval("hasattr(bytes,'fromhex')")
try:
transport.exec("from binascii import a2b_base64")
has_a2b_base64 = True
except TransportExecError:
has_a2b_base64 = False
try:
transport.exec("from io import BytesIO")
transport.exec("from deflate import DeflateIO,RAW")
has_deflate_io = True
except TransportExecError:
has_deflate_io = False
# Deploy the ROMFS filesystem image to the device.
for offset in range(0, len(romfs), chunk_size):
romfs_chunk = romfs[offset : offset + chunk_size]
romfs_chunk += bytes(chunk_size - len(romfs_chunk))
if has_deflate_io:
# Needs: binascii.a2b_base64, io.BytesIO, deflate.DeflateIO.
romfs_chunk_compressed = zlib.compress(romfs_chunk, wbits=-9)
buf = binascii.b2a_base64(romfs_chunk_compressed).strip()
transport.exec(f"buf=DeflateIO(BytesIO(a2b_base64({buf})),RAW,9).read()")
elif has_a2b_base64:
# Needs: binascii.a2b_base64.
buf = binascii.b2a_base64(romfs_chunk)
transport.exec(f"buf=a2b_base64({buf})")
elif has_bytes_fromhex:
# Needs: bytes.fromhex.
buf = romfs_chunk.hex()
transport.exec(f"buf=bytes.fromhex('{buf}')")
else:
# Needs nothing special.
transport.exec("buf=" + repr(romfs_chunk))
print(f"\rWriting at offset {offset}", end="")
if has_object:
transport.exec(
f"dev.writeblocks({offset // rom_block_size},buf,{offset % rom_block_size})"
)
else:
transport.exec(f"vfs.rom_ioctl(4,{rom_id},{offset},buf)")
# Complete writing.
if not has_object:
transport.eval(f"vfs.rom_ioctl(5,{rom_id})")
print()
print("ROMFS image deployed")
def do_romfs(state, args):
if args.command[0] == "query":
_do_romfs_query(state, args)
elif args.command[0] == "build":
_do_romfs_build(state, args)
elif args.command[0] == "deploy":
_do_romfs_deploy(state, args)
else:
raise CommandError(
f"romfs: '{args.command[0]}' is not a command; pass romfs --help for a list"
)

View File

@@ -36,6 +36,7 @@ from .commands import (
do_resume,
do_rtc,
do_soft_reset,
do_romfs,
)
from .mip import do_mip
from .repl import do_repl
@@ -228,6 +229,32 @@ def argparse_mip():
return cmd_parser
def argparse_romfs():
cmd_parser = argparse.ArgumentParser(description="manage ROM partitions")
_bool_flag(
cmd_parser,
"mpy",
"m",
True,
"automatically compile .py files to .mpy when building the ROMFS image (default)",
)
cmd_parser.add_argument(
"--partition",
"-p",
type=int,
default=0,
help="ROMFS partition to use",
)
cmd_parser.add_argument(
"--output",
"-o",
help="output file",
)
cmd_parser.add_argument("command", nargs=1, help="romfs command, one of: query, build, deploy")
cmd_parser.add_argument("path", nargs="?", help="path to directory to deploy")
return cmd_parser
def argparse_none(description):
return lambda: argparse.ArgumentParser(description=description)
@@ -302,6 +329,10 @@ _COMMANDS = {
do_version,
argparse_none("print version and exit"),
),
"romfs": (
do_romfs,
argparse_romfs,
),
}
# Additional commands aliases.

View File

@@ -0,0 +1,148 @@
# MIT license; Copyright (c) 2022 Damien P. George
import struct, sys, os
try:
from mpy_cross import run as mpy_cross_run
except ImportError:
mpy_cross_run = None
class VfsRomWriter:
ROMFS_HEADER = b"\xd2\xcd\x31"
ROMFS_RECORD_KIND_UNUSED = 0
ROMFS_RECORD_KIND_PADDING = 1
ROMFS_RECORD_KIND_DATA_VERBATIM = 2
ROMFS_RECORD_KIND_DATA_POINTER = 3
ROMFS_RECORD_KIND_DIRECTORY = 4
ROMFS_RECORD_KIND_FILE = 5
def __init__(self):
self._dir_stack = [(None, bytearray())]
def _encode_uint(self, value):
encoded = [value & 0x7F]
value >>= 7
while value != 0:
encoded.insert(0, 0x80 | (value & 0x7F))
value >>= 7
return bytes(encoded)
def _pack(self, kind, payload):
return self._encode_uint(kind) + self._encode_uint(len(payload)) + payload
def _extend(self, data):
buf = self._dir_stack[-1][1]
buf.extend(data)
return len(buf)
def finalise(self):
_, data = self._dir_stack.pop()
encoded_kind = VfsRomWriter.ROMFS_HEADER
encoded_len = self._encode_uint(len(data))
if (len(encoded_kind) + len(encoded_len) + len(data)) % 2 == 1:
encoded_len = b"\x80" + encoded_len
data = encoded_kind + encoded_len + data
return data
def opendir(self, dirname):
self._dir_stack.append((dirname, bytearray()))
def closedir(self):
dirname, dirdata = self._dir_stack.pop()
dirdata = self._encode_uint(len(dirname)) + bytes(dirname, "ascii") + dirdata
self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DIRECTORY, dirdata))
def mkdata(self, data):
assert len(self._dir_stack) == 1
return self._extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, data)) - len(
data
)
def mkfile(self, filename, filedata):
filename = bytes(filename, "ascii")
payload = self._encode_uint(len(filename))
payload += filename
if isinstance(filedata, tuple):
sub_payload = self._encode_uint(filedata[0])
sub_payload += self._encode_uint(filedata[1])
payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_POINTER, sub_payload)
else:
payload += self._pack(VfsRomWriter.ROMFS_RECORD_KIND_DATA_VERBATIM, filedata)
self._dir_stack[-1][1].extend(self._pack(VfsRomWriter.ROMFS_RECORD_KIND_FILE, payload))
def copy_recursively(vfs, src_dir, print_prefix, mpy_cross):
assert src_dir.endswith("/")
DIR = 1 << 14
mpy_cross_missed = 0
dir_contents = sorted(os.listdir(src_dir))
for name in dir_contents:
src_name = src_dir + name
st = os.stat(src_name)
if name == dir_contents[-1]:
# Last entry in the directory listing.
print_entry = "\\--"
print_recurse = " "
else:
# Not the last entry in the directory listing.
print_entry = "|--"
print_recurse = "| "
if st[0] & DIR:
# A directory, enter it and copy its contents recursively.
print(print_prefix + print_entry, name + "/")
vfs.opendir(name)
mpy_cross_missed += copy_recursively(
vfs, src_name + "/", print_prefix + print_recurse, mpy_cross
)
vfs.closedir()
else:
# A file.
did_mpy = False
name_extra = ""
if mpy_cross and name.endswith(".py"):
name_mpy = name[:-3] + ".mpy"
src_name_mpy = src_dir + name_mpy
if not os.path.isfile(src_name_mpy):
if mpy_cross_run is not None:
did_mpy = True
proc = mpy_cross_run(src_name)
proc.wait()
else:
mpy_cross_missed += 1
if did_mpy:
name_extra = " -> .mpy"
print(print_prefix + print_entry, name + name_extra)
if did_mpy:
name = name_mpy
src_name = src_name_mpy
with open(src_name, "rb") as src:
vfs.mkfile(name, src.read())
if did_mpy:
os.remove(src_name_mpy)
return mpy_cross_missed
def make_romfs(src_dir, *, mpy_cross):
if not src_dir.endswith("/"):
src_dir += "/"
vfs = VfsRomWriter()
# Build the filesystem recursively.
print("Building romfs filesystem, source directory: {}".format(src_dir))
print("/")
try:
mpy_cross_missed = copy_recursively(vfs, src_dir, "", mpy_cross)
except OSError as er:
print("Error: OSError {}".format(er), file=sys.stderr)
sys.exit(1)
if mpy_cross_missed:
print("Warning: `mpy_cross` module not found, .py files were not precompiled")
mpy_cross = False
return vfs.finalise()