Files
micropython/tests/serial_test.py
Angus Gratton 2762fe680a
Some checks failed
JavaScript code lint and formatting with Biome / eslint (push) Has been cancelled
Check code formatting / code-formatting (push) Has been cancelled
Check spelling with codespell / codespell (push) Has been cancelled
Build docs / build (push) Has been cancelled
Check examples / embedding (push) Has been cancelled
Package mpremote / build (push) Has been cancelled
.mpy file format and tools / test (push) Has been cancelled
Build ports metadata / build (push) Has been cancelled
alif port / build_alif (alif_ae3_build) (push) Has been cancelled
cc3200 port / build (push) Has been cancelled
esp32 port / build_idf (esp32_build_c2_c6) (push) Has been cancelled
esp32 port / build_idf (esp32_build_cmod_spiram_s2) (push) Has been cancelled
esp32 port / build_idf (esp32_build_s3_c3) (push) Has been cancelled
esp8266 port / build (push) Has been cancelled
mimxrt port / build (push) Has been cancelled
nrf port / build (push) Has been cancelled
powerpc port / build (push) Has been cancelled
qemu port / build_and_test_arm (bigendian) (push) Has been cancelled
qemu port / build_and_test_arm (sabrelite) (push) Has been cancelled
qemu port / build_and_test_arm (thumb_hardfp) (push) Has been cancelled
qemu port / build_and_test_arm (thumb_softfp) (push) Has been cancelled
qemu port / build_and_test_rv32 (push) Has been cancelled
qemu port / build_and_test_rv64 (push) Has been cancelled
renesas-ra port / build_renesas_ra_board (push) Has been cancelled
rp2 port / build (push) Has been cancelled
samd port / build (push) Has been cancelled
stm32 port / build_stm32 (stm32_misc_build) (push) Has been cancelled
stm32 port / build_stm32 (stm32_nucleo_build) (push) Has been cancelled
stm32 port / build_stm32 (stm32_pyb_build) (push) Has been cancelled
unix port / minimal (push) Has been cancelled
unix port / reproducible (push) Has been cancelled
unix port / standard (push) Has been cancelled
unix port / standard_v2 (push) Has been cancelled
unix port / coverage (push) Has been cancelled
unix port / coverage_32bit (push) Has been cancelled
unix port / nanbox (push) Has been cancelled
unix port / longlong (push) Has been cancelled
unix port / float (push) Has been cancelled
unix port / gil_enabled (push) Has been cancelled
unix port / stackless_clang (push) Has been cancelled
unix port / float_clang (push) Has been cancelled
unix port / settrace_stackless (push) Has been cancelled
unix port / repr_b (push) Has been cancelled
unix port / macos (push) Has been cancelled
unix port / qemu_mips (push) Has been cancelled
unix port / qemu_arm (push) Has been cancelled
unix port / qemu_riscv64 (push) Has been cancelled
unix port / sanitize_address (push) Has been cancelled
unix port / sanitize_undefined (push) Has been cancelled
webassembly port / build (push) Has been cancelled
windows port / build-vs (Debug, true, x64, dev, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Debug, true, x86, dev, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Debug, x64, dev, 2022, [17, 18)) (push) Has been cancelled
windows port / build-vs (Debug, x86, dev, 2022, [17, 18)) (push) Has been cancelled
windows port / build-vs (Release, true, x64, dev, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Release, true, x64, dev, 2019, [16, 17)) (push) Has been cancelled
windows port / build-vs (Release, true, x64, standard, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Release, true, x64, standard, 2019, [16, 17)) (push) Has been cancelled
windows port / build-vs (Release, true, x86, dev, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Release, true, x86, dev, 2019, [16, 17)) (push) Has been cancelled
windows port / build-vs (Release, true, x86, standard, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Release, true, x86, standard, 2019, [16, 17)) (push) Has been cancelled
windows port / build-vs (Release, x64, dev, 2022, [17, 18)) (push) Has been cancelled
windows port / build-vs (Release, x64, standard, 2022, [17, 18)) (push) Has been cancelled
windows port / build-vs (Release, x86, dev, 2022, [17, 18)) (push) Has been cancelled
windows port / build-vs (Release, x86, standard, 2022, [17, 18)) (push) Has been cancelled
windows port / build-mingw (i686, mingw32, dev) (push) Has been cancelled
windows port / build-mingw (i686, mingw32, standard) (push) Has been cancelled
windows port / build-mingw (x86_64, mingw64, dev) (push) Has been cancelled
windows port / build-mingw (x86_64, mingw64, standard) (push) Has been cancelled
windows port / cross-build-on-linux (push) Has been cancelled
zephyr port / build (push) Has been cancelled
Python code lint and formatting with ruff / ruff (push) Has been cancelled
tests/serial_test.py: Allow up to 2 seconds between bytes.
Only a problem when UART TX is also enabled and goes first (i.e. esp32
port) as sending 16384 bytes in one go triggers the timeout.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2025-11-06 12:26:43 +11:00

269 lines
7.3 KiB
Python
Executable File

#!/usr/bin/env python
#
# Performance and reliability test for serial port communication.
#
# Basic usage:
# serial_test.py [-t serial-device]
#
# The `serial-device` will default to /dev/ttyACM0.
import argparse
import serial
import sys
import time
run_tests_module = __import__("run-tests")
read_test_script = """
bin = True
try:
wr=__import__("pyb").USB_VCP(0).send
except:
import sys
if hasattr(sys.stdout,'buffer'):
wr=sys.stdout.buffer.write
else:
wr=sys.stdout.write
bin = False
b=bytearray(%u)
if bin:
wr('BIN')
for i in range(len(b)):
b[i] = i & 0xff
else:
wr('TXT')
for i in range(len(b)):
b[i] = 0x20 + (i & 0x3f)
for _ in range(%d):
wr(b)
"""
write_test_script_verified = """
import sys
try:
rd=__import__("pyb").USB_VCP(0).recv
except:
rd=sys.stdin.readinto
b=bytearray(%u)
for _ in range(%u):
n = rd(b)
fail = 0
for i in range(n):
if b[i] != 32 + (i & 0x3f):
fail += 1
if fail:
sys.stdout.write(b'ER%%05u' %% fail)
else:
sys.stdout.write(b'OK%%05u' %% n)
"""
write_test_script_unverified = """
import sys
try:
rd=__import__("pyb").USB_VCP(0).recv
except:
rd=sys.stdin.readinto
b=bytearray(%u)
for _ in range(%u):
n = rd(b)
if n != len(b):
sys.stdout.write(b'ER%%05u' %% n)
else:
sys.stdout.write(b'OK%%05u' %% n)
"""
class TestError(Exception):
pass
def drain_input(ser):
time.sleep(0.1)
while ser.inWaiting() > 0:
data = ser.read(ser.inWaiting())
time.sleep(0.1)
def send_script(ser, script):
chunk_size = 32
for i in range(0, len(script), chunk_size):
ser.write(script[i : i + chunk_size])
time.sleep(0.01)
ser.write(b"\x04") # eof
ser.flush()
response = ser.read(2)
if response != b"OK":
response += ser.read(ser.inWaiting())
raise TestError("could not send script", response)
def read_test(ser_repl, ser_data, bufsize, nbuf):
global test_passed
assert bufsize % 256 == 0 # for verify to work
# how long to wait for data from device
# (if UART TX is also enabled then it can take 1.4s to send
# out a 16KB butter at 115200bps)
READ_TIMEOUT_S = 2
# Load and run the read_test_script.
ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
drain_input(ser_repl)
script = bytes(read_test_script % (bufsize, nbuf), "ascii")
send_script(ser_repl, script)
# Read from the device the type of data that it will send (BIN or TXT).
data_type = ser_data.read(3)
# Read data from the device, check it is correct, and measure throughput.
n = 0
last_byte = None
t_start = time.time()
remain = nbuf * bufsize
total_data = bytearray(remain)
while remain:
t0 = time.monotonic_ns()
while ser_data.inWaiting() == 0:
if time.monotonic_ns() - t0 > READ_TIMEOUT_S * 1e9:
# timeout waiting for data from device
break
time.sleep(0.0001)
if not ser_data.inWaiting():
test_passed = False
print("ERROR: timeout waiting for data")
print(total_data[:n])
return 0
to_read = min(ser_data.inWaiting(), remain)
data = ser_data.read(to_read)
remain -= len(data)
print(f"{n} / {nbuf * bufsize}", end="\r")
total_data[n : n + len(data)] = data
n += len(data)
t_end = time.time()
for i in range(0, len(total_data)):
if data_type == b"BIN":
wanted = i & 0xFF
else:
wanted = 0x20 + (i & 0x3F)
if total_data[i] != wanted:
test_passed = False
print("ERROR: data mismatch:", i, wanted, total_data[i])
ser_repl.write(b"\x03") # break
t = t_end - t_start
# Print results.
print(
"DATA IN: bufsize=%u, read %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
% (bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
)
return n / t
def write_test(ser_repl, ser_data, bufsize, nbuf, verified):
global test_passed
# Load and run the write_test_script.
ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
drain_input(ser_repl)
if verified:
script = write_test_script_verified
else:
script = write_test_script_unverified
script = bytes(script % (bufsize, nbuf), "ascii")
send_script(ser_repl, script)
drain_input(ser_repl)
# Write data to the device, check it is correct, and measure throughput.
n = 0
t_start = time.time()
buf = bytearray(bufsize)
for i in range(len(buf)):
buf[i] = 32 + (i & 0x3F) # don't want to send ctrl chars!
for i in range(nbuf):
ser_data.write(buf)
n += len(buf)
print(f"{n} / {nbuf * bufsize}", end="\r")
response = ser_repl.read(7)
if response != b"OK%05u" % bufsize:
test_passed = False
print("ERROR: bad response, expecting OK%05u, got %r" % (bufsize, response))
t_end = time.time()
ser_repl.write(b"\x03") # break
t = t_end - t_start
# Print results.
print(
"DATA OUT: verify=%d, bufsize=%u, wrote %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
% (verified, bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
)
return n / t
def do_test(dev_repl, dev_data=None, time_per_subtest=1):
if dev_data is None:
print("REPL and data on", dev_repl)
ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1)
ser_data = ser_repl
else:
print("REPL on", dev_repl)
print("data on", dev_data)
ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1)
ser_data = serial.Serial(dev_data, baudrate=115200, timeout=1)
for test_func, test_args, bufsize in (
(read_test, (), 256),
(write_test, (True,), 128),
(write_test, (False,), 128),
):
nbuf = 128
while bufsize <= 16384:
rate = test_func(ser_repl, ser_data, bufsize, nbuf, *test_args)
bufsize *= 2
if rate:
# Adjust the amount of data based on the rate, to keep each subtest
# at around time_per_subtest seconds long.
nbuf = max(min(128, int(rate * time_per_subtest / bufsize)), 1)
ser_repl.close()
ser_data.close()
def main():
global test_passed
cmd_parser = argparse.ArgumentParser(
description="Test performance and reliability of serial port communication.",
epilog=run_tests_module.test_instance_epilog,
formatter_class=argparse.RawTextHelpFormatter,
)
cmd_parser.add_argument(
"-t",
"--test-instance",
default="a0",
help="MicroPython instance to test",
)
cmd_parser.add_argument(
"--time-per-subtest", default="1", help="approximate time to take per subtest (in seconds)"
)
args = cmd_parser.parse_args()
dev_repl = run_tests_module.convert_device_shortcut_to_real_device(args.test_instance)
test_passed = True
try:
do_test(dev_repl, None, float(args.time_per_subtest))
except TestError as er:
test_passed = False
print("ERROR:", er)
if not test_passed:
sys.exit(1)
if __name__ == "__main__":
main()