mirror of
https://github.com/micropython/micropython.git
synced 2025-12-16 09:50:15 +01:00
The existing `serial_test.py` script tests data in/out throughput and reliability. But it only tests data sizes that are a power of two, which may not catch certain errors with USB transmission (because FS packet size is 64 bytes). This commit adds a new echo sub-test to the `serial_test.py` script. It sends out data to the target and gets the target to echo it back, and then compares the result (the echo'd data should be equal to the sent data). It does this for data packets between 1 and 520 (inclusive) bytes, which covers USB FS and HS packet sizes (64 and 512 respectively). It uses random data for the test, but seeded by a constant seed so that it's deterministic. If there's an error then it prints out all the sent and echo'd data to make it easier to see where it went wrong (eg if the previous packet was repeated). Signed-off-by: Damien George <damien@micropython.org>
339 lines
9.4 KiB
Python
Executable File
339 lines
9.4 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#
|
|
# Performance and reliability test for serial port communication.
|
|
#
|
|
# Basic usage:
|
|
# serial_test.py [-t serial-device]
|
|
#
|
|
# The `serial-device` will default to /dev/ttyACM0.
|
|
|
|
import argparse
|
|
import random
|
|
import serial
|
|
import sys
|
|
import time
|
|
|
|
run_tests_module = __import__("run-tests")
|
|
|
|
echo_test_script = """
|
|
import sys
|
|
bytes_min=%u
|
|
bytes_max=%u
|
|
repeat=%u
|
|
b=memoryview(bytearray(bytes_max))
|
|
for n in range(bytes_min,bytes_max+1):
|
|
for _ in range(repeat):
|
|
n2 = sys.stdin.readinto(b[:n])
|
|
sys.stdout.write(b[:n2])
|
|
"""
|
|
|
|
read_test_script = """
|
|
bin = True
|
|
try:
|
|
wr=__import__("pyb").USB_VCP(0).send
|
|
except:
|
|
import sys
|
|
if hasattr(sys.stdout,'buffer'):
|
|
wr=sys.stdout.buffer.write
|
|
else:
|
|
wr=sys.stdout.write
|
|
bin = False
|
|
b=bytearray(%u)
|
|
if bin:
|
|
wr('BIN')
|
|
for i in range(len(b)):
|
|
b[i] = i & 0xff
|
|
else:
|
|
wr('TXT')
|
|
for i in range(len(b)):
|
|
b[i] = 0x20 + (i & 0x3f)
|
|
for _ in range(%d):
|
|
wr(b)
|
|
"""
|
|
|
|
|
|
write_test_script_verified = """
|
|
import sys
|
|
try:
|
|
rd=__import__("pyb").USB_VCP(0).recv
|
|
except:
|
|
rd=sys.stdin.readinto
|
|
b=bytearray(%u)
|
|
for _ in range(%u):
|
|
n = rd(b)
|
|
fail = 0
|
|
for i in range(n):
|
|
if b[i] != 32 + (i & 0x3f):
|
|
fail += 1
|
|
if fail:
|
|
sys.stdout.write(b'ER%%05u' %% fail)
|
|
else:
|
|
sys.stdout.write(b'OK%%05u' %% n)
|
|
"""
|
|
|
|
write_test_script_unverified = """
|
|
import sys
|
|
try:
|
|
rd=__import__("pyb").USB_VCP(0).recv
|
|
except:
|
|
rd=sys.stdin.readinto
|
|
b=bytearray(%u)
|
|
for _ in range(%u):
|
|
n = rd(b)
|
|
if n != len(b):
|
|
sys.stdout.write(b'ER%%05u' %% n)
|
|
else:
|
|
sys.stdout.write(b'OK%%05u' %% n)
|
|
"""
|
|
|
|
|
|
class TestError(Exception):
|
|
pass
|
|
|
|
|
|
def drain_input(ser):
|
|
time.sleep(0.1)
|
|
while ser.inWaiting() > 0:
|
|
data = ser.read(ser.inWaiting())
|
|
time.sleep(0.1)
|
|
|
|
|
|
def send_script(ser, script):
|
|
ser.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
|
|
drain_input(ser)
|
|
chunk_size = 32
|
|
for i in range(0, len(script), chunk_size):
|
|
ser.write(script[i : i + chunk_size])
|
|
time.sleep(0.01)
|
|
ser.write(b"\x04") # eof
|
|
ser.flush()
|
|
response = ser.read(2)
|
|
if response != b"OK":
|
|
response += ser.read(ser.inWaiting())
|
|
raise TestError("could not send script", response)
|
|
|
|
|
|
def echo_test(ser_repl, ser_data):
|
|
global test_passed
|
|
|
|
# Make the test data deterministic.
|
|
random.seed(0)
|
|
|
|
# Set parameters for the test.
|
|
# Go just a bit above the size of a USB high-speed packet.
|
|
bytes_min = 1
|
|
bytes_max = 520
|
|
num_repeat = 1
|
|
|
|
# Load and run the write_test_script.
|
|
script = bytes(echo_test_script % (bytes_min, bytes_max, num_repeat), "ascii")
|
|
send_script(ser_repl, script)
|
|
|
|
# A selection of printable bytes for echo data.
|
|
printable_bytes = list(range(48, 58)) + list(range(65, 91)) + list(range(97, 123))
|
|
|
|
# Write data to the device and record the echo'd data.
|
|
# Use a different selection of random printable characters for each
|
|
# echo, to make it easier to debug when the echo doesn't match.
|
|
num_errors = 0
|
|
echo_results = []
|
|
for num_bytes in range(bytes_min, bytes_max + 1):
|
|
print(f"DATA ECHO: {num_bytes} / {bytes_max}", end="\r")
|
|
for repeat in range(num_repeat):
|
|
rand_bytes = list(random.choice(printable_bytes) for _ in range(8))
|
|
buf = bytes(random.choice(rand_bytes) for _ in range(num_bytes))
|
|
ser_data.write(buf)
|
|
buf2 = ser_data.read(len(buf))
|
|
match = buf == buf2
|
|
num_errors += not match
|
|
echo_results.append((match, buf, buf2))
|
|
if num_errors > 8:
|
|
# Stop early if there are too many errors.
|
|
break
|
|
ser_repl.write(b"\x03")
|
|
|
|
# Print results.
|
|
if all(match for match, _, _ in echo_results):
|
|
print("DATA ECHO: OK for {}-{} bytes at a time".format(bytes_min, bytes_max))
|
|
else:
|
|
test_passed = False
|
|
print("DATA ECHO: FAIL ")
|
|
for match, buf, buf2 in echo_results:
|
|
print(" sent", len(buf), buf)
|
|
if match:
|
|
print(" echo match")
|
|
else:
|
|
print(" echo", len(buf), buf2)
|
|
|
|
|
|
def read_test(ser_repl, ser_data, bufsize, nbuf):
|
|
global test_passed
|
|
|
|
assert bufsize % 256 == 0 # for verify to work
|
|
|
|
# how long to wait for data from device
|
|
# (if UART TX is also enabled then it can take 1.4s to send
|
|
# out a 16KB butter at 115200bps)
|
|
READ_TIMEOUT_S = 2
|
|
|
|
# Load and run the read_test_script.
|
|
script = bytes(read_test_script % (bufsize, nbuf), "ascii")
|
|
send_script(ser_repl, script)
|
|
|
|
# Read from the device the type of data that it will send (BIN or TXT).
|
|
data_type = ser_data.read(3)
|
|
|
|
# Read data from the device, check it is correct, and measure throughput.
|
|
n = 0
|
|
last_byte = None
|
|
t_start = time.time()
|
|
remain = nbuf * bufsize
|
|
total_data = bytearray(remain)
|
|
while remain:
|
|
t0 = time.monotonic_ns()
|
|
while ser_data.inWaiting() == 0:
|
|
if time.monotonic_ns() - t0 > READ_TIMEOUT_S * 1e9:
|
|
# timeout waiting for data from device
|
|
break
|
|
time.sleep(0.0001)
|
|
if not ser_data.inWaiting():
|
|
test_passed = False
|
|
print("ERROR: timeout waiting for data")
|
|
print(total_data[:n])
|
|
return 0
|
|
to_read = min(ser_data.inWaiting(), remain)
|
|
data = ser_data.read(to_read)
|
|
remain -= len(data)
|
|
print(f"{n} / {nbuf * bufsize}", end="\r")
|
|
total_data[n : n + len(data)] = data
|
|
n += len(data)
|
|
t_end = time.time()
|
|
for i in range(0, len(total_data)):
|
|
if data_type == b"BIN":
|
|
wanted = i & 0xFF
|
|
else:
|
|
wanted = 0x20 + (i & 0x3F)
|
|
if total_data[i] != wanted:
|
|
test_passed = False
|
|
print("ERROR: data mismatch:", i, wanted, total_data[i])
|
|
ser_repl.write(b"\x03") # break
|
|
t = t_end - t_start
|
|
|
|
# Print results.
|
|
print(
|
|
"DATA IN: bufsize=%u, read %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
|
|
% (bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
|
|
)
|
|
|
|
return n / t
|
|
|
|
|
|
def write_test(ser_repl, ser_data, bufsize, nbuf, verified):
|
|
global test_passed
|
|
|
|
# Load and run the write_test_script.
|
|
if verified:
|
|
script = write_test_script_verified
|
|
else:
|
|
script = write_test_script_unverified
|
|
script = bytes(script % (bufsize, nbuf), "ascii")
|
|
send_script(ser_repl, script)
|
|
drain_input(ser_repl)
|
|
|
|
# Write data to the device, check it is correct, and measure throughput.
|
|
n = 0
|
|
t_start = time.time()
|
|
buf = bytearray(bufsize)
|
|
for i in range(len(buf)):
|
|
buf[i] = 32 + (i & 0x3F) # don't want to send ctrl chars!
|
|
for i in range(nbuf):
|
|
ser_data.write(buf)
|
|
n += len(buf)
|
|
print(f"{n} / {nbuf * bufsize}", end="\r")
|
|
response = ser_repl.read(7)
|
|
if response != b"OK%05u" % bufsize:
|
|
test_passed = False
|
|
print("ERROR: bad response, expecting OK%05u, got %r" % (bufsize, response))
|
|
t_end = time.time()
|
|
ser_repl.write(b"\x03") # break
|
|
t = t_end - t_start
|
|
|
|
# Print results.
|
|
print(
|
|
"DATA OUT: verify=%d, bufsize=%u, wrote %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
|
|
% (verified, bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
|
|
)
|
|
|
|
return n / t
|
|
|
|
|
|
def do_test(dev_repl, dev_data=None, time_per_subtest=1):
|
|
if dev_data is None:
|
|
print("REPL and data on", dev_repl)
|
|
ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1)
|
|
ser_data = ser_repl
|
|
else:
|
|
print("REPL on", dev_repl)
|
|
print("data on", dev_data)
|
|
ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1)
|
|
ser_data = serial.Serial(dev_data, baudrate=115200, timeout=1)
|
|
|
|
# Do echo test first, and abort if it doesn't pass.
|
|
echo_test(ser_repl, ser_data)
|
|
if not test_passed:
|
|
return
|
|
|
|
# Do read and write throughput test.
|
|
for test_func, test_args, bufsize in (
|
|
(read_test, (), 256),
|
|
(write_test, (True,), 128),
|
|
(write_test, (False,), 128),
|
|
):
|
|
nbuf = 128
|
|
while bufsize <= 16384:
|
|
rate = test_func(ser_repl, ser_data, bufsize, nbuf, *test_args)
|
|
bufsize *= 2
|
|
if rate:
|
|
# Adjust the amount of data based on the rate, to keep each subtest
|
|
# at around time_per_subtest seconds long.
|
|
nbuf = max(min(128, int(rate * time_per_subtest / bufsize)), 1)
|
|
|
|
ser_repl.close()
|
|
ser_data.close()
|
|
|
|
|
|
def main():
|
|
global test_passed
|
|
|
|
cmd_parser = argparse.ArgumentParser(
|
|
description="Test performance and reliability of serial port communication.",
|
|
epilog=run_tests_module.test_instance_epilog,
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
)
|
|
cmd_parser.add_argument(
|
|
"-t",
|
|
"--test-instance",
|
|
default="a0",
|
|
help="MicroPython instance to test",
|
|
)
|
|
cmd_parser.add_argument(
|
|
"--time-per-subtest", default="1", help="approximate time to take per subtest (in seconds)"
|
|
)
|
|
args = cmd_parser.parse_args()
|
|
|
|
dev_repl = run_tests_module.convert_device_shortcut_to_real_device(args.test_instance)
|
|
|
|
test_passed = True
|
|
try:
|
|
do_test(dev_repl, None, float(args.time_per_subtest))
|
|
except TestError as er:
|
|
test_passed = False
|
|
print("ERROR:", er)
|
|
|
|
if not test_passed:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|