diff --git a/tests/run-internalbench.py b/tests/run-internalbench.py index 99c6304afe..4f2d29525d 100755 --- a/tests/run-internalbench.py +++ b/tests/run-internalbench.py @@ -8,9 +8,14 @@ import re from glob import glob from collections import defaultdict -run_tests_module = __import__("run-tests") -sys.path.append(run_tests_module.base_path("../tools")) -import pyboard +from test_utils import ( + base_path, + pyboard, + test_instance_description, + test_instance_epilog, + test_directory_description, + get_test_instance, +) if os.name == "nt": MICROPYTHON = os.getenv( @@ -97,10 +102,10 @@ def main(): formatter_class=argparse.RawDescriptionHelpFormatter, description=f"""Run and manage tests for MicroPython. -{run_tests_module.test_instance_description} -{run_tests_module.test_directory_description} +{test_instance_description} +{test_directory_description} """, - epilog=run_tests_module.test_instance_epilog, + epilog=test_instance_epilog, ) cmd_parser.add_argument( "-t", "--test-instance", default="unix", help="the MicroPython instance to test" @@ -124,9 +129,7 @@ def main(): args = cmd_parser.parse_args() # Note pyboard support is copied over from run-tests.py, not tests, and likely needs revamping - pyb = run_tests_module.get_test_instance( - args.test_instance, args.baudrate, args.user, args.password - ) + pyb = get_test_instance(args.test_instance, args.baudrate, args.user, args.password) if len(args.files) == 0: if args.test_dirs: diff --git a/tests/run-multitests.py b/tests/run-multitests.py index e5458ffe0d..c5bb8011e7 100755 --- a/tests/run-multitests.py +++ b/tests/run-multitests.py @@ -15,7 +15,13 @@ import itertools import subprocess import tempfile -run_tests_module = __import__("run-tests") +from test_utils import ( + base_path, + pyboard, + test_instance_epilog, + convert_device_shortcut_to_real_device, + create_test_report, +) test_dir = os.path.abspath(os.path.dirname(__file__)) @@ -24,9 +30,6 @@ if os.path.abspath(sys.path[0]) == test_dir: # accidentally importing tests like micropython/const.py sys.path.pop(0) -sys.path.insert(0, test_dir + "/../tools") -import pyboard - if os.name == "nt": CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3.exe") MICROPYTHON = os.path.abspath( @@ -554,7 +557,7 @@ def main(): cmd_parser = argparse.ArgumentParser( description="Run network tests for MicroPython", epilog=( - run_tests_module.test_instance_epilog + test_instance_epilog + "Each instance arg can optionally have custom env provided, eg. ,ENV=VAR,ENV=VAR...\n" ), formatter_class=argparse.RawTextHelpFormatter, @@ -582,7 +585,7 @@ def main(): cmd_parser.add_argument( "-r", "--result-dir", - default=run_tests_module.base_path("results"), + default=base_path("results"), help="directory for test results", ) cmd_parser.add_argument("files", nargs="+", help="input test files") @@ -612,7 +615,7 @@ def main(): print("unsupported instance string: {}".format(cmd), file=sys.stderr) sys.exit(2) else: - device = run_tests_module.convert_device_shortcut_to_real_device(cmd) + device = convert_device_shortcut_to_real_device(cmd) instances_test.append(PyInstancePyboard(device)) for _ in range(max_instances - len(instances_test)): @@ -626,7 +629,7 @@ def main(): break test_results = run_tests(test_files, instances_truth, instances_test_permutation) - all_pass &= run_tests_module.create_test_report(cmd_args, test_results) + all_pass &= create_test_report(cmd_args, test_results) finally: for i in instances_truth: diff --git a/tests/run-natmodtests.py b/tests/run-natmodtests.py index cd6c643bf9..cb0877e2cb 100755 --- a/tests/run-natmodtests.py +++ b/tests/run-natmodtests.py @@ -9,7 +9,13 @@ import subprocess import sys import argparse -run_tests_module = __import__("run-tests") +from test_utils import ( + base_path, + pyboard, + test_instance_epilog, + get_test_instance, + create_test_report, +) # Paths for host executables CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3") @@ -110,7 +116,7 @@ class TargetPyboard: output = self.pyb.exec_(script) output = output.replace(b"\r\n", b"\n") return output, None - except run_tests_module.pyboard.PyboardError as er: + except pyboard.PyboardError as er: return b"", er @@ -221,7 +227,7 @@ def main(): cmd_parser = argparse.ArgumentParser( description="Run dynamic-native-module tests under MicroPython", - epilog=run_tests_module.test_instance_epilog, + epilog=test_instance_epilog, formatter_class=argparse.RawDescriptionHelpFormatter, ) cmd_parser.add_argument( @@ -243,7 +249,7 @@ def main(): cmd_parser.add_argument( "-r", "--result-dir", - default=run_tests_module.base_path("results"), + default=base_path("results"), help="directory for test results", ) cmd_parser.add_argument("files", nargs="*", help="input test files") @@ -257,9 +263,7 @@ def main(): target_truth = TargetSubprocess([CPYTHON3]) - target = run_tests_module.get_test_instance( - args.test_instance, args.baudrate, args.user, args.password - ) + target = get_test_instance(args.test_instance, args.baudrate, args.user, args.password) if target is None: # Use the unix port of MicroPython. target = TargetSubprocess([MICROPYTHON]) @@ -283,7 +287,7 @@ def main(): os.makedirs(args.result_dir, exist_ok=True) test_results = run_tests(target_truth, target, args, target_arch) - res = run_tests_module.create_test_report(args, test_results) + res = create_test_report(args, test_results) target.close() target_truth.close() diff --git a/tests/run-perfbench.py b/tests/run-perfbench.py index 039d11a361..16182bc8a9 100755 --- a/tests/run-perfbench.py +++ b/tests/run-perfbench.py @@ -10,9 +10,13 @@ import sys import argparse from glob import glob -run_tests_module = __import__("run-tests") - -prepare_script_for_target = run_tests_module.prepare_script_for_target +from test_utils import ( + base_path, + pyboard, + get_test_instance, + prepare_script_for_target, + create_test_report, +) # Paths for host executables if os.name == "nt": @@ -49,7 +53,7 @@ def run_script_on_target(target, script): try: target.enter_raw_repl() output = target.exec_(script) - except run_tests_module.pyboard.PyboardError as er: + except pyboard.PyboardError as er: err = er else: # Run local executable @@ -277,7 +281,7 @@ def main(): cmd_parser.add_argument( "-r", "--result-dir", - default=run_tests_module.base_path("results"), + default=base_path("results"), help="directory for test results", ) cmd_parser.add_argument( @@ -298,9 +302,7 @@ def main(): M = int(args.M[0]) n_average = int(args.average) - target = run_tests_module.get_test_instance( - args.test_instance, args.baudrate, args.user, args.password - ) + target = get_test_instance(args.test_instance, args.baudrate, args.user, args.password) if target is None: # Use the unix port of MicroPython. target = [MICROPYTHON, "-X", "emit=" + args.emit] @@ -328,7 +330,7 @@ def main(): os.makedirs(args.result_dir, exist_ok=True) test_results = run_benchmarks(args, target, N, M, n_average, tests) - res = run_tests_module.create_test_report(args, test_results) + res = create_test_report(args, test_results) if hasattr(target, "exit_raw_repl"): target.exit_raw_repl() diff --git a/tests/run-tests.py b/tests/run-tests.py index 6dfba2c540..0ff358c06e 100755 --- a/tests/run-tests.py +++ b/tests/run-tests.py @@ -6,7 +6,6 @@ import sys import sysconfig import platform import argparse -import inspect import json import re from glob import glob @@ -15,23 +14,29 @@ from multiprocessing.pool import ThreadPool import threading import tempfile -# Maximum time to run a single test, in seconds. -TEST_TIMEOUT = float(os.environ.get("MICROPY_TEST_TIMEOUT", 30)) - -# See stackoverflow.com/questions/2632199: __file__ nor sys.argv[0] -# are guaranteed to always work, this one should though. -BASEPATH = os.path.dirname(os.path.abspath(inspect.getsourcefile(lambda: None))) +from test_utils import ( + base_path, + pyboard, + TEST_TIMEOUT, + MPYCROSS, + test_instance_description, + test_instance_epilog, + test_directory_description, + rm_f, + normalize_newlines, + set_injected_prologue, + get_results_filename, + convert_device_shortcut_to_real_device, + get_test_instance, + prepare_script_for_target, + create_test_report, +) RV32_ARCH_FLAGS = { "zba": 1 << 0, "zcmp": 1 << 1, } - -def base_path(*p): - return os.path.abspath(os.path.join(BASEPATH, *p)).replace("\\", "/") - - # Tests require at least CPython 3.3. If your default python3 executable # is of lower version, you can point MICROPY_CPYTHON3 environment var # to the correct executable. @@ -40,88 +45,22 @@ if os.name == "nt": MICROPYTHON = os.getenv( "MICROPY_MICROPYTHON", base_path("../ports/windows/build-standard/micropython.exe") ) - # mpy-cross is only needed if --via-mpy command-line arg is passed - MPYCROSS = os.getenv("MICROPY_MPYCROSS", base_path("../mpy-cross/build/mpy-cross.exe")) else: CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3") MICROPYTHON = os.getenv( "MICROPY_MICROPYTHON", base_path("../ports/unix/build-standard/micropython") ) - # mpy-cross is only needed if --via-mpy command-line arg is passed - MPYCROSS = os.getenv("MICROPY_MPYCROSS", base_path("../mpy-cross/build/mpy-cross")) # Use CPython options to not save .pyc files, to only access the core standard library # (not site packages which may clash with u-module names), and improve start up time. CPYTHON3_CMD = [CPYTHON3, "-BS"] -# File with the test results. -RESULTS_FILE = "_results.json" - # For diff'ing test output DIFF = os.getenv("MICROPY_DIFF", "diff -u") # Set PYTHONIOENCODING so that CPython will use utf-8 on systems which set another encoding in the locale os.environ["PYTHONIOENCODING"] = "utf-8" - -def normalize_newlines(data): - """Normalize newline variations to \\n. - - Only normalizes actual line endings, not literal \\r characters in strings. - Handles \\r\\r\\n and \\r\\n cases to ensure consistent comparison - across different platforms and terminals. - """ - if isinstance(data, bytes): - # Handle PTY double-newline issue first - data = data.replace(b"\r\r\n", b"\n") - # Then handle standard Windows line endings - data = data.replace(b"\r\n", b"\n") - # Don't convert standalone \r as it might be literal content - return data - - -# Code to allow a target MicroPython to import an .mpy from RAM -# Note: the module is named `__injected_test` but it needs to have `__name__` set to -# `__main__` so that the test sees itself as the main module, eg so unittest works. -injected_import_hook_code = """\ -import sys, os, io, vfs -class __File(io.IOBase): - def __init__(self): - module = sys.modules['__injected_test'] - module.__name__ = '__main__' - sys.modules['__main__'] = module - self.off = 0 - def ioctl(self, request, arg): - if request == 4: # MP_STREAM_CLOSE - return 0 - return -1 - def readinto(self, buf): - buf[:] = memoryview(__buf)[self.off:self.off + len(buf)] - self.off += len(buf) - return len(buf) -class __FS: - def mount(self, readonly, mkfs): - pass - def umount(self): - pass - def chdir(self, path): - pass - def getcwd(self): - return "" - def stat(self, path): - if path == '__injected_test.mpy': - return (0,0,0,0,0,0,0,0,0,0) - else: - raise OSError(2) # ENOENT - def open(self, path, mode): - self.stat(path) - return __File() -vfs.mount(__FS(), '/__vfstest') -os.chdir('/__vfstest') -{import_prologue} -__import__('__injected_test') -""" - # Platforms associated with the unix port, values of `sys.platform`. PC_PLATFORMS = ("darwin", "linux", "win32") @@ -337,11 +276,6 @@ tests_requiring_target_wiring = ( ) -def rm_f(fname): - if os.path.exists(fname): - os.remove(fname) - - # unescape wanted regex chars and escape unwanted ones def convert_regex_escapes(line): cs = [] @@ -366,38 +300,6 @@ def platform_to_port(platform): return platform_to_port_map.get(platform, platform) -def convert_device_shortcut_to_real_device(device): - if device.startswith("port:"): - return device.split(":", 1)[1] - elif device.startswith("a") and device[1:].isdigit(): - return "/dev/ttyACM" + device[1:] - elif device.startswith("u") and device[1:].isdigit(): - return "/dev/ttyUSB" + device[1:] - elif device.startswith("c") and device[1:].isdigit(): - return "COM" + device[1:] - else: - return device - - -def get_test_instance(test_instance, baudrate, user, password): - if test_instance == "unix": - return None - elif test_instance == "webassembly": - return PyboardNodeRunner() - else: - # Assume it's a device path. - port = convert_device_shortcut_to_real_device(test_instance) - - global pyboard - sys.path.append(base_path("../tools")) - import pyboard - - pyb = pyboard.Pyboard(port, baudrate, user, password) - pyboard.Pyboard.run_script_on_remote_target = run_script_on_remote_target - pyb.enter_raw_repl() - return pyb - - def detect_inline_asm_arch(pyb, args): for arch in ("rv32", "thumb", "xtensa"): output = run_feature_check(pyb, args, "inlineasm_{}.py".format(arch)) @@ -503,90 +405,6 @@ def detect_target_wiring_script(pyb, args): pyb.target_wiring_script = tw_data -def prepare_script_for_target(args, *, script_text=None, force_plain=False): - if force_plain or (not args.via_mpy and args.emit == "bytecode"): - # A plain test to run as-is, no processing needed. - pass - elif args.via_mpy: - tempname = tempfile.mktemp(dir="") - mpy_filename = tempname + ".mpy" - - script_filename = tempname + ".py" - with open(script_filename, "wb") as f: - f.write(script_text) - - try: - subprocess.check_output( - [MPYCROSS] - + args.mpy_cross_flags.split() - + ["-o", mpy_filename, "-X", "emit=" + args.emit, script_filename], - stderr=subprocess.STDOUT, - ) - except subprocess.CalledProcessError as er: - return True, b"mpy-cross crash\n" + er.output - - with open(mpy_filename, "rb") as f: - script_text = b"__buf=" + bytes(repr(f.read()), "ascii") + b"\n" - - rm_f(mpy_filename) - rm_f(script_filename) - - script_text += bytes(injected_import_hook_code, "ascii") - else: - print("error: using emit={} must go via .mpy".format(args.emit)) - sys.exit(1) - - return False, script_text - - -def run_script_on_remote_target(pyb, args, test_file, is_special): - with open(test_file, "rb") as f: - script = f.read() - - # If the test is not a special test, prepend it with a print to indicate that it started. - # If the print does not execute this means that the test did not even start, eg it was - # too large for the target. - prepend_start_test = not is_special - if prepend_start_test: - if script.startswith(b"#"): - script = b"print('START TEST')" + script - else: - script = b"print('START TEST')\n" + script - - had_crash, script = prepare_script_for_target(args, script_text=script, force_plain=is_special) - - if had_crash: - return True, script - - try: - had_crash = False - pyb.enter_raw_repl() - if test_file.endswith(tests_requiring_target_wiring) and pyb.target_wiring_script: - pyb.exec_( - "import sys;sys.modules['target_wiring']=__build_class__(lambda:exec(" - + repr(pyb.target_wiring_script) - + "),'target_wiring')" - ) - output_mupy = pyb.exec_(script, timeout=TEST_TIMEOUT) - except pyboard.PyboardError as e: - had_crash = True - if not is_special and e.args[0] == "exception": - if prepend_start_test and e.args[1] == b"" and b"MemoryError" in e.args[2]: - output_mupy = b"SKIP-TOO-LARGE\n" - else: - output_mupy = e.args[1] + e.args[2] + b"CRASH" - else: - output_mupy = bytes(e.args[0], "ascii") + b"\nCRASH" - - if prepend_start_test: - if output_mupy.startswith(b"START TEST\r\n"): - output_mupy = output_mupy.removeprefix(b"START TEST\r\n") - else: - had_crash = True - - return had_crash, output_mupy - - tests_with_regex_output = [ base_path(file) for file in ( @@ -722,8 +540,9 @@ def run_micropython(pyb, args, test_file, test_file_abspath, is_special=False): else: # run via pyboard interface + requires_target_wiring = test_file.endswith(tests_requiring_target_wiring) had_crash, output_mupy = pyb.run_script_on_remote_target( - args, test_file_abspath, is_special + args, test_file_abspath, is_special, requires_target_wiring ) # canonical form for all ports/platforms is to use \n for end-of-line @@ -813,51 +632,6 @@ class ThreadSafeCounter: return self._value -class PyboardNodeRunner: - def __init__(self): - mjs = os.getenv("MICROPY_MICROPYTHON_MJS") - if mjs is None: - mjs = base_path("../ports/webassembly/build-standard/micropython.mjs") - else: - mjs = os.path.abspath(mjs) - self.micropython_mjs = mjs - - def close(self): - pass - - def run_script_on_remote_target(self, args, test_file, is_special): - cwd = os.path.dirname(test_file) - - # Create system command list. - cmdlist = ["node"] - if test_file.endswith(".py"): - # Run a Python script indirectly via "node micropython.mjs ". - cmdlist.append(self.micropython_mjs) - if args.heapsize is not None: - cmdlist.extend(["-X", "heapsize=" + args.heapsize]) - cmdlist.append(test_file) - else: - # Run a js/mjs script directly with Node, passing in the path to micropython.mjs. - cmdlist.append(test_file) - cmdlist.append(self.micropython_mjs) - - # Run the script. - try: - had_crash = False - output_mupy = subprocess.check_output( - cmdlist, stderr=subprocess.STDOUT, timeout=TEST_TIMEOUT, cwd=cwd - ) - except subprocess.CalledProcessError as er: - had_crash = True - output_mupy = er.output + b"CRASH" - except subprocess.TimeoutExpired as er: - had_crash = True - output_mupy = (er.output or b"") + b"TIMEOUT" - - # Return the results. - return had_crash, output_mupy - - def run_tests(pyb, tests, args, result_dir, num_threads=1): testcase_count = ThreadSafeCounter() test_results = ThreadSafeCounter([]) @@ -1257,70 +1031,6 @@ def run_tests(pyb, tests, args, result_dir, num_threads=1): return test_results.value, testcase_count.value -# Print a summary of the results and save them to a JSON file. -# Returns True if everything succeeded, False otherwise. -def create_test_report(args, test_results, testcase_count=None): - passed_tests = list(r for r in test_results if r[1] == "pass") - skipped_tests = list(r for r in test_results if r[1] == "skip" and r[2] != "too large") - skipped_tests_too_large = list( - r for r in test_results if r[1] == "skip" and r[2] == "too large" - ) - failed_tests = list(r for r in test_results if r[1] == "fail") - - num_tests_performed = len(passed_tests) + len(failed_tests) - - testcase_count_info = "" - if testcase_count is not None: - testcase_count_info = " ({} individual testcases)".format(testcase_count) - print("{} tests performed{}".format(num_tests_performed, testcase_count_info)) - - print("{} tests passed".format(len(passed_tests))) - - if len(skipped_tests) > 0: - print( - "{} tests skipped: {}".format( - len(skipped_tests), " ".join(test[0] for test in skipped_tests) - ) - ) - - if len(skipped_tests_too_large) > 0: - print( - "{} tests skipped because they are too large: {}".format( - len(skipped_tests_too_large), " ".join(test[0] for test in skipped_tests_too_large) - ) - ) - - if len(failed_tests) > 0: - print( - "{} tests failed: {}".format( - len(failed_tests), " ".join(test[0] for test in failed_tests) - ) - ) - - # Serialize regex added by append_filter. - def to_json(obj): - if isinstance(obj, re.Pattern): - return obj.pattern - return obj - - with open(os.path.join(args.result_dir, RESULTS_FILE), "w") as f: - json.dump( - { - # The arguments passed on the command-line. - "args": vars(args), - # A list of all results of the form [(test, result, reason), ...]. - "results": list(test for test in test_results), - # A list of failed tests. This is deprecated, use the "results" above instead. - "failed_tests": [test[0] for test in failed_tests], - }, - f, - default=to_json, - ) - - # Return True only if all tests succeeded. - return len(failed_tests) == 0 - - class append_filter(argparse.Action): def __init__(self, option_strings, dest, **kwargs): super().__init__(option_strings, dest, default=[], **kwargs) @@ -1335,39 +1045,7 @@ class append_filter(argparse.Action): args.filters.append((option, re.compile(value))) -test_instance_description = """\ -By default the tests are run against the unix port of MicroPython. To run it -against something else, use the -t option. See below for details. -""" -test_instance_epilog = """\ -The -t option accepts the following for the test instance: -- unix - use the unix port of MicroPython, specified by the MICROPY_MICROPYTHON - environment variable (which defaults to the standard variant of either the unix - or windows ports, depending on the host platform) -- webassembly - use the webassembly port of MicroPython, specified by the - MICROPY_MICROPYTHON_MJS environment variable (which defaults to the standard - variant of the webassembly port) -- port: - connect to and use the given serial port device -- a - connect to and use /dev/ttyACM -- u - connect to and use /dev/ttyUSB -- c - connect to and use COM -- exec: - execute a command and attach to its stdin/stdout -- execpty: - execute a command and attach to the printed /dev/pts/ device -- ... - connect to the given IPv4 address -- anything else specifies a serial port -""" - -test_directory_description = """\ -Tests are discovered by scanning test directories for .py files or using the -specified test files. If test files nor directories are specified, the script -expects to be ran in the tests directory (where this file is located) and the -builtin tests suitable for the target platform are ran. -""" - - def main(): - global injected_import_hook_code - cmd_parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=f"""Run and manage tests for MicroPython. @@ -1474,7 +1152,7 @@ the last matching regex is used: if args.begin: with open(args.begin, "rt") as source: prologue = source.read() - injected_import_hook_code = injected_import_hook_code.replace("{import_prologue}", prologue) + set_injected_prologue(prologue) if args.print_failures: for out in glob(os.path.join(args.result_dir, "*.out")): @@ -1497,7 +1175,7 @@ the last matching regex is used: os.path.join(args.result_dir, "*.out") ): os.remove(f) - rm_f(os.path.join(args.result_dir, RESULTS_FILE)) + rm_f(get_results_filename(args)) sys.exit(0) @@ -1513,7 +1191,7 @@ the last matching regex is used: ) if args.run_failures: - results_file = os.path.join(args.result_dir, RESULTS_FILE) + results_file = get_results_filename(args) if os.path.exists(results_file): with open(results_file, "r") as f: tests = list(test[0] for test in json.load(f)["results"] if test[1] == "fail") diff --git a/tests/serial_test.py b/tests/serial_test.py index 3b5940d91a..eebea402fa 100755 --- a/tests/serial_test.py +++ b/tests/serial_test.py @@ -13,7 +13,7 @@ import serial import sys import time -run_tests_module = __import__("run-tests") +from test_utils import test_instance_epilog, convert_device_shortcut_to_real_device echo_test_script = """ import sys @@ -307,7 +307,7 @@ def main(): cmd_parser = argparse.ArgumentParser( description="Test performance and reliability of serial port communication.", - epilog=run_tests_module.test_instance_epilog, + epilog=test_instance_epilog, formatter_class=argparse.RawTextHelpFormatter, ) cmd_parser.add_argument( @@ -321,7 +321,7 @@ def main(): ) args = cmd_parser.parse_args() - dev_repl = run_tests_module.convert_device_shortcut_to_real_device(args.test_instance) + dev_repl = convert_device_shortcut_to_real_device(args.test_instance) test_passed = True try: diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000000..abbc670c12 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,358 @@ +# This file is part of the MicroPython project, http://micropython.org/ +# The MIT License (MIT) +# Copyright (c) 2019-2025 Damien P. George + +import inspect +import json +import os +import re +import subprocess +import sys +import tempfile + +# See stackoverflow.com/questions/2632199: __file__ nor sys.argv[0] +# are guaranteed to always work, this one should though. +_BASEPATH = os.path.dirname(os.path.abspath(inspect.getsourcefile(lambda: None))) + + +def base_path(*p): + return os.path.abspath(os.path.join(_BASEPATH, *p)).replace("\\", "/") + + +sys.path.append(base_path("../tools")) +import pyboard + +# File with the test results. +_RESULTS_FILE = "_results.json" + +# Maximum time to run a single test, in seconds. +TEST_TIMEOUT = float(os.environ.get("MICROPY_TEST_TIMEOUT", 30)) + +# mpy-cross is only needed if --via-mpy command-line arg is passed +if os.name == "nt": + MPYCROSS = os.getenv("MICROPY_MPYCROSS", base_path("../mpy-cross/build/mpy-cross.exe")) +else: + MPYCROSS = os.getenv("MICROPY_MPYCROSS", base_path("../mpy-cross/build/mpy-cross")) + +test_instance_description = """\ +By default the tests are run against the unix port of MicroPython. To run it +against something else, use the -t option. See below for details. +""" + +test_instance_epilog = """\ +The -t option accepts the following for the test instance: +- unix - use the unix port of MicroPython, specified by the MICROPY_MICROPYTHON + environment variable (which defaults to the standard variant of either the unix + or windows ports, depending on the host platform) +- webassembly - use the webassembly port of MicroPython, specified by the + MICROPY_MICROPYTHON_MJS environment variable (which defaults to the standard + variant of the webassembly port) +- port: - connect to and use the given serial port device +- a - connect to and use /dev/ttyACM +- u - connect to and use /dev/ttyUSB +- c - connect to and use COM +- exec: - execute a command and attach to its stdin/stdout +- execpty: - execute a command and attach to the printed /dev/pts/ device +- ... - connect to the given IPv4 address +- anything else specifies a serial port +""" + +test_directory_description = """\ +Tests are discovered by scanning test directories for .py files or using the +specified test files. If test files nor directories are specified, the script +expects to be ran in the tests directory (where this file is located) and the +builtin tests suitable for the target platform are ran. +""" + +# Code to allow a target MicroPython to import an .mpy from RAM +# Note: the module is named `__injected_test` but it needs to have `__name__` set to +# `__main__` so that the test sees itself as the main module, eg so unittest works. +_injected_import_hook_code = """\ +import sys, os, io, vfs +class __File(io.IOBase): + def __init__(self): + module = sys.modules['__injected_test'] + module.__name__ = '__main__' + sys.modules['__main__'] = module + self.off = 0 + def ioctl(self, request, arg): + if request == 4: # MP_STREAM_CLOSE + return 0 + return -1 + def readinto(self, buf): + buf[:] = memoryview(__buf)[self.off:self.off + len(buf)] + self.off += len(buf) + return len(buf) +class __FS: + def mount(self, readonly, mkfs): + pass + def umount(self): + pass + def chdir(self, path): + pass + def getcwd(self): + return "" + def stat(self, path): + if path == '__injected_test.mpy': + return (0,0,0,0,0,0,0,0,0,0) + else: + raise OSError(2) # ENOENT + def open(self, path, mode): + self.stat(path) + return __File() +vfs.mount(__FS(), '/__vfstest') +os.chdir('/__vfstest') +{import_prologue} +__import__('__injected_test') +""" + + +class PyboardNodeRunner: + def __init__(self): + mjs = os.getenv("MICROPY_MICROPYTHON_MJS") + if mjs is None: + mjs = base_path("../ports/webassembly/build-standard/micropython.mjs") + else: + mjs = os.path.abspath(mjs) + self.micropython_mjs = mjs + + def close(self): + pass + + def run_script_on_remote_target(self, args, test_file, is_special, requires_target_wiring): + cwd = os.path.dirname(test_file) + + # Create system command list. + cmdlist = ["node"] + if test_file.endswith(".py"): + # Run a Python script indirectly via "node micropython.mjs ". + cmdlist.append(self.micropython_mjs) + if args.heapsize is not None: + cmdlist.extend(["-X", "heapsize=" + args.heapsize]) + cmdlist.append(test_file) + else: + # Run a js/mjs script directly with Node, passing in the path to micropython.mjs. + cmdlist.append(test_file) + cmdlist.append(self.micropython_mjs) + + # Run the script. + try: + had_crash = False + output_mupy = subprocess.check_output( + cmdlist, stderr=subprocess.STDOUT, timeout=TEST_TIMEOUT, cwd=cwd + ) + except subprocess.CalledProcessError as er: + had_crash = True + output_mupy = er.output + b"CRASH" + except subprocess.TimeoutExpired as er: + had_crash = True + output_mupy = (er.output or b"") + b"TIMEOUT" + + # Return the results. + return had_crash, output_mupy + + +def rm_f(fname): + if os.path.exists(fname): + os.remove(fname) + + +def normalize_newlines(data): + """Normalize newline variations to \\n. + + Only normalizes actual line endings, not literal \\r characters in strings. + Handles \\r\\r\\n and \\r\\n cases to ensure consistent comparison + across different platforms and terminals. + """ + if isinstance(data, bytes): + # Handle PTY double-newline issue first + data = data.replace(b"\r\r\n", b"\n") + # Then handle standard Windows line endings + data = data.replace(b"\r\n", b"\n") + # Don't convert standalone \r as it might be literal content + return data + + +def set_injected_prologue(prologue): + global _injected_import_hook_code + _injected_import_hook_code = _injected_import_hook_code.replace("{import_prologue}", prologue) + + +def get_results_filename(args): + return os.path.join(args.result_dir, _RESULTS_FILE) + + +def convert_device_shortcut_to_real_device(device): + if device.startswith("port:"): + return device.split(":", 1)[1] + elif device.startswith("a") and device[1:].isdigit(): + return "/dev/ttyACM" + device[1:] + elif device.startswith("u") and device[1:].isdigit(): + return "/dev/ttyUSB" + device[1:] + elif device.startswith("c") and device[1:].isdigit(): + return "COM" + device[1:] + else: + return device + + +def get_test_instance(test_instance, baudrate, user, password): + if test_instance == "unix": + return None + elif test_instance == "webassembly": + return PyboardNodeRunner() + else: + # Assume it's a device path. + port = convert_device_shortcut_to_real_device(test_instance) + + pyb = pyboard.Pyboard(port, baudrate, user, password) + pyboard.Pyboard.run_script_on_remote_target = run_script_on_remote_target + pyb.enter_raw_repl() + return pyb + + +def prepare_script_for_target(args, *, script_text=None, force_plain=False): + if force_plain or (not args.via_mpy and args.emit == "bytecode"): + # A plain test to run as-is, no processing needed. + pass + elif args.via_mpy: + tempname = tempfile.mktemp(dir="") + mpy_filename = tempname + ".mpy" + + script_filename = tempname + ".py" + with open(script_filename, "wb") as f: + f.write(script_text) + + try: + subprocess.check_output( + [MPYCROSS] + + args.mpy_cross_flags.split() + + ["-o", mpy_filename, "-X", "emit=" + args.emit, script_filename], + stderr=subprocess.STDOUT, + ) + except subprocess.CalledProcessError as er: + return True, b"mpy-cross crash\n" + er.output + + with open(mpy_filename, "rb") as f: + script_text = b"__buf=" + bytes(repr(f.read()), "ascii") + b"\n" + + rm_f(mpy_filename) + rm_f(script_filename) + + script_text += bytes(_injected_import_hook_code, "ascii") + else: + print("error: using emit={} must go via .mpy".format(args.emit)) + sys.exit(1) + + return False, script_text + + +def run_script_on_remote_target(pyb, args, test_file, is_special, requires_target_wiring): + with open(test_file, "rb") as f: + script = f.read() + + # If the test is not a special test, prepend it with a print to indicate that it started. + # If the print does not execute this means that the test did not even start, eg it was + # too large for the target. + prepend_start_test = not is_special + if prepend_start_test: + if script.startswith(b"#"): + script = b"print('START TEST')" + script + else: + script = b"print('START TEST')\n" + script + + had_crash, script = prepare_script_for_target(args, script_text=script, force_plain=is_special) + + if had_crash: + return True, script + + try: + had_crash = False + pyb.enter_raw_repl() + if requires_target_wiring and pyb.target_wiring_script: + pyb.exec_( + "import sys;sys.modules['target_wiring']=__build_class__(lambda:exec(" + + repr(pyb.target_wiring_script) + + "),'target_wiring')" + ) + output_mupy = pyb.exec_(script, timeout=TEST_TIMEOUT) + except pyboard.PyboardError as e: + had_crash = True + if not is_special and e.args[0] == "exception": + if prepend_start_test and e.args[1] == b"" and b"MemoryError" in e.args[2]: + output_mupy = b"SKIP-TOO-LARGE\n" + else: + output_mupy = e.args[1] + e.args[2] + b"CRASH" + else: + output_mupy = bytes(e.args[0], "ascii") + b"\nCRASH" + + if prepend_start_test: + if output_mupy.startswith(b"START TEST\r\n"): + output_mupy = output_mupy.removeprefix(b"START TEST\r\n") + else: + had_crash = True + + return had_crash, output_mupy + + +# Print a summary of the results and save them to a JSON file. +# Returns True if everything succeeded, False otherwise. +def create_test_report(args, test_results, testcase_count=None): + passed_tests = list(r for r in test_results if r[1] == "pass") + skipped_tests = list(r for r in test_results if r[1] == "skip" and r[2] != "too large") + skipped_tests_too_large = list( + r for r in test_results if r[1] == "skip" and r[2] == "too large" + ) + failed_tests = list(r for r in test_results if r[1] == "fail") + + num_tests_performed = len(passed_tests) + len(failed_tests) + + testcase_count_info = "" + if testcase_count is not None: + testcase_count_info = " ({} individual testcases)".format(testcase_count) + print("{} tests performed{}".format(num_tests_performed, testcase_count_info)) + + print("{} tests passed".format(len(passed_tests))) + + if len(skipped_tests) > 0: + print( + "{} tests skipped: {}".format( + len(skipped_tests), " ".join(test[0] for test in skipped_tests) + ) + ) + + if len(skipped_tests_too_large) > 0: + print( + "{} tests skipped because they are too large: {}".format( + len(skipped_tests_too_large), " ".join(test[0] for test in skipped_tests_too_large) + ) + ) + + if len(failed_tests) > 0: + print( + "{} tests failed: {}".format( + len(failed_tests), " ".join(test[0] for test in failed_tests) + ) + ) + + # Serialize regex added by append_filter. + def to_json(obj): + if isinstance(obj, re.Pattern): + return obj.pattern + return obj + + with open(get_results_filename(args), "w") as f: + json.dump( + { + # The arguments passed on the command-line. + "args": vars(args), + # A list of all results of the form [(test, result, reason), ...]. + "results": list(test for test in test_results), + # A list of failed tests. This is deprecated, use the "results" above instead. + "failed_tests": [test[0] for test in failed_tests], + }, + f, + default=to_json, + ) + + # Return True only if all tests succeeded. + return len(failed_tests) == 0