diff --git a/ports/stm32/pyb_can.c b/ports/stm32/pyb_can.c index 2d676b23d2..d60c3c42d4 100644 --- a/ports/stm32/pyb_can.c +++ b/ports/stm32/pyb_can.c @@ -80,7 +80,7 @@ #endif #else -#define CAN_MAX_FILTER (28) +#define CAN_MAX_FILTER_CAN1_2 (28) // This limit is the max across CAN1+CAN2 shared indexing #define CAN_MAX_DATA_FRAME (8) #define CAN_DEFAULT_PRESCALER (100) @@ -232,12 +232,8 @@ static mp_obj_t pyb_can_init_helper(pyb_can_obj_t *self, size_t n_args, const mp self->can.Init.DataTimeSeg1 = args[ARG_brs_bs1].u_int; // DataTimeSeg1 = Propagation_segment + Phase_segment_1 self->can.Init.DataTimeSeg2 = args[ARG_brs_bs2].u_int; #else - // Init filter banks for classic CAN. + // set the can2_start_bank value can2_start_bank = args[ARG_num_filter_banks].u_int; - int bank_offs = (self->can_id == 2) ? can2_start_bank : 0; - for (int f = 0; f < CAN_MAX_FILTER; f++) { - can_clearfilter(&self->can, f + bank_offs, can2_start_bank); - } #endif mp_uint_t mode = args[ARG_mode].u_int; @@ -307,6 +303,33 @@ static MP_DEFINE_CONST_FUN_OBJ_KW(pyb_can_init_obj, 1, pyb_can_init); // deinit() static mp_obj_t pyb_can_deinit(mp_obj_t self_in) { pyb_can_obj_t *self = MP_OBJ_TO_PTR(self_in); + + #if !MICROPY_HW_ENABLE_FDCAN + // Clear filter banks for classic CAN, in case of re-init + int filter_min, filter_max; + switch (self->can_id) { + #ifdef MICROPY_HW_CAN3_NAME + case 3: // CAN3 filter numbering is independent of 1+2 + filter_min = 0; + filter_max = CAN_HW_MAX_FILTER; + break; + #endif + #ifdef MICROPY_HW_CAN2_NAME + case 2: // CAN2 filters run from can2_start_bank to the max + filter_min = can2_start_bank; + filter_max = CAN_MAX_FILTER_CAN1_2; + break; + #endif + default: // CAN1 filters run from 0 to can2_start_bank + filter_min = 0; + filter_max = can2_start_bank; + break; + } + for (int f = filter_min; f < filter_max; f++) { + can_clearfilter(&self->can, f, can2_start_bank); + } + #endif + can_deinit(&self->can); self->is_enabled = false; return mp_const_none; diff --git a/tests/extmod_hardware/machine_can2.py b/tests/extmod_hardware/machine_can2.py deleted file mode 100644 index 0ecced8286..0000000000 --- a/tests/extmod_hardware/machine_can2.py +++ /dev/null @@ -1,44 +0,0 @@ -# Test machine.CAN(1) and machine.CAN(2) using loopback -# -# Single device test, assumes support for loopback and no connections to the CAN pins -# -# This test is ported from tests/ports/stm32/pyb_can2.py - -try: - from machine import CAN - - CAN(2, 125_000) -except (ImportError, ValueError): - print("SKIP") - raise SystemExit - -import time - -# Setting up each CAN peripheral independently is deliberate here, to catch -# catch cases where initialising CAN2 breaks CAN1 - -can1 = CAN(1, 125_000, mode=CAN.MODE_LOOPBACK) -can1.set_filters([(0x100, 0x700, 0)]) - -can2 = CAN(2, 125_000, mode=CAN.MODE_LOOPBACK) -can2.set_filters([(0x000, 0x7F0, 0)]) - -# Drain any old messages in RX FIFOs -for can in (can1, can2): - while can.recv(): - pass - -for id, can in ((1, can1), (2, can2)): - print("testing", id) - # message1 should only receive on can1, message2 on can2 - can.send(0x123, b"message1", 0) - can.send(0x003, "message2", 0) - time.sleep_ms(10) - did_recv = False - while res := can.recv(): - did_recv = True - print(hex(res[0]), bytes(res[1]), res[2], res[3]) - if not did_recv: - print("no rx!") - -print("done") diff --git a/tests/extmod_hardware/machine_can2.py.exp b/tests/extmod_hardware/machine_can2.py.exp deleted file mode 100644 index bfb6a5088b..0000000000 --- a/tests/extmod_hardware/machine_can2.py.exp +++ /dev/null @@ -1,5 +0,0 @@ -testing 1 -0x123 b'message1' 0 0 -testing 2 -0x3 b'message2' 0 0 -done diff --git a/tests/extmod_hardware/machine_can_instances.py b/tests/extmod_hardware/machine_can_instances.py new file mode 100644 index 0000000000..e280466e54 --- /dev/null +++ b/tests/extmod_hardware/machine_can_instances.py @@ -0,0 +1,74 @@ +# Test multiple concurrent CAN instances using loopback. +# Initialising in any order shouldn't break TX, RX or filtering. +# +# This test is ported from tests/ports/stm32/pyb_can_instances.py + +try: + from machine import CAN + + CAN(2, 125_000) # skip any board which doesn't have at least 2 CAN peripherals +except (ImportError, ValueError): + print("SKIP") + raise SystemExit + +import time +import unittest + +# Some boards have 3x CAN peripherals, test all three +HAS_CAN3 = True +try: + CAN(3, 125_000) +except ValueError: + HAS_CAN3 = False + + +class Test(unittest.TestCase): + def test_can12(self): + self._test_pairs([(1, 2), (2, 1)]) + + @unittest.skipUnless(HAS_CAN3, "no CAN3") + def test_can3(self): + self._test_pairs([(1, 3), (3, 1), (2, 3), (3, 2)]) + + def _test_pairs(self, seq): + for id_a, id_b in seq: + with self.subTest("Testing CAN pair", id_a=id_a, id_b=id_b): + self._test_controller_pair(id_a, id_b) + + def _test_controller_pair(self, id_a, id_b): + # Setting up each CAN peripheral independently is deliberate here, to catch + # catch cases where initialising CAN2 breaks CAN1 or vice versa + can_a = CAN(id_a, 125_000, mode=CAN.MODE_LOOPBACK) + can_a.set_filters([(0x100, 0x700, 0)]) + + can_b = CAN(id_b, 125_000, mode=CAN.MODE_LOOPBACK) + can_b.set_filters([(0x000, 0x7F0, 0)]) + + try: + # Drain any old messages in RX FIFOs + for can in (can_a, can_b): + while can.recv(): + pass + + for which, id, can in (("A", id_a, can_a), ("B", id_b, can_b)): + # print("testing config", which, "with controller", can) + # message1 should only receive on can_a, message2 on can_b + can.send(0x123, "message1", 0) + can.send(0x003, "message2", 0) + time.sleep_ms(10) + n_recv = 0 + while res := can.recv(): + n_recv += 1 + # print(res) + if can == can_a: + self.assertEqual(res[1], b"message1", "can_a should receive message1 only") + if can == can_b: + self.assertEqual(res[1], b"message2", "can_b should receive message2 only") + self.assertEqual(n_recv, 1, "Each instance should receive exactly 1 message") + finally: + can_a.deinit() + can_b.deinit() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/ports/stm32/pyb_can2.py b/tests/ports/stm32/pyb_can2.py deleted file mode 100644 index 62ae935357..0000000000 --- a/tests/ports/stm32/pyb_can2.py +++ /dev/null @@ -1,50 +0,0 @@ -try: - from pyb import CAN - - CAN(2) -except (ImportError, ValueError): - print("SKIP") - raise SystemExit - -# Classic CAN (aka bxCAN) hardware has a different filter API -# and some different behaviours to newer FDCAN hardware -IS_CLASSIC = hasattr(CAN, "MASK16") - -# Setting up each CAN peripheral independently is deliberate here, to catch -# catch cases where initialising CAN2 breaks CAN1 - -can1 = CAN(1, CAN.LOOPBACK) -if IS_CLASSIC: - can1.setfilter(0, CAN.LIST16, 0, (123, 124, 125, 126)) -else: - can1.setfilter(0, CAN.RANGE, 0, (123, 126)) - -can2 = CAN(2, CAN.LOOPBACK) -if IS_CLASSIC: - can2.setfilter(0, CAN.LIST16, 0, (3, 4, 5, 6)) -else: - can2.setfilter(0, CAN.RANGE, 0, (3, 6)) - -# Drain any old messages in RX FIFOs -for can in (can1, can2): - while can.any(0): - can.recv(0) - -for id, can in ((1, can1), (2, can2)): - print("testing", id) - # message1 should only receive on can1, message2 on can2 - can.send("message1", 123) - can.send("message2", 3) - did_recv = False - try: - while True: - res = can.recv(0, timeout=50) - # not printing all of 'res' as the filter index result is different - # on Classic vs FD-CAN - print("rx", res[0], res[4]) - did_recv = True - except OSError: - if not did_recv: - print("no rx!") - -print("done") diff --git a/tests/ports/stm32/pyb_can2.py.exp b/tests/ports/stm32/pyb_can2.py.exp deleted file mode 100644 index 9696f2fa01..0000000000 --- a/tests/ports/stm32/pyb_can2.py.exp +++ /dev/null @@ -1,5 +0,0 @@ -testing 1 -rx 123 b'message1' -testing 2 -rx 3 b'message2' -done diff --git a/tests/ports/stm32/pyb_can_instances.py b/tests/ports/stm32/pyb_can_instances.py new file mode 100644 index 0000000000..a5f6d3d8d5 --- /dev/null +++ b/tests/ports/stm32/pyb_can_instances.py @@ -0,0 +1,90 @@ +# Test that initialising CAN instances in any order doesn't break +# TX, RX or filtering +try: + from pyb import CAN + + CAN(2) # skip any board which doesn't have at least 2 CAN peripherals +except (ImportError, ValueError): + print("SKIP") + raise SystemExit + +import unittest +import errno + +# Classic CAN (aka bxCAN) hardware has a different filter API +# and some different behaviours to newer FDCAN hardware +IS_CLASSIC = hasattr(CAN, "MASK16") + +# Some boards have 3x CAN peripherals, test all three +HAS_CAN3 = True +try: + CAN(3) +except ValueError: + HAS_CAN3 = False + + +class Test(unittest.TestCase): + def test_can12(self): + self._test_pairs([(1, 2), (2, 1)]) + + @unittest.skipUnless(HAS_CAN3, "no CAN3") + def test_can3(self): + self._test_pairs([(1, 3), (3, 1), (2, 3), (3, 2)]) + + def _test_pairs(self, seq): + for id_a, id_b in seq: + with self.subTest("Testing CAN pair", id_a=id_a, id_b=id_b): + self._test_controller_pair(id_a, id_b) + + def _test_controller_pair(self, id_a, id_b): + # Setting up each CAN peripheral independently is deliberate here, to catch + # catch cases where initialising CAN2 breaks CAN1 or vice versa + try: + can_a = CAN(id_a, CAN.LOOPBACK) + if IS_CLASSIC: + can_a.setfilter(0, CAN.LIST16, 0, (123, 124, 125, 126)) + else: + can_a.setfilter(0, CAN.RANGE, 0, (123, 126)) + + can_b = CAN(id_b, CAN.LOOPBACK) + if IS_CLASSIC: + can_b.setfilter(0, CAN.LIST16, 0, (3, 4, 5, 6)) + else: + can_b.setfilter(0, CAN.RANGE, 0, (3, 6)) + + # Drain any old messages in RX FIFOs + for can in (can_a, can_b): + while can.any(0): + can.recv(0) + + for which, id, can in (("A", id_a, can_a), ("B", id_b, can_b)): + print("testing config", which, "with controller", can) + # message1 should only receive on can_a, message2 on can_b + can.send("message1", 123) + can.send("message2", 3) + n_recv = 0 + try: + while True: + res = can.recv(0, timeout=50) + n_recv += 1 + print("received", res) + if can == can_a: + self.assertEqual( + res[4], b"message1", "can_a should receive message1 only" + ) + if can == can_b: + self.assertEqual( + res[4], b"message2", "can_b should receive message2 only" + ) + except OSError as e: + if e.errno != errno.ETIMEDOUT: + raise + print("recv timed out") + self.assertEqual(n_recv, 1, "Each instance should receive exactly 1 message") + finally: + can_a.deinit() + can_b.deinit() + + +if __name__ == "__main__": + unittest.main()