stm32: Fix case where initialising Classic CAN1 corrupts CAN2,3.

- CAN1 init would clear all filters including CAN2 filter range.
- CAN3 init would call can_clearfilter() with empty self->can values,
  the HAL layer interpreted this as clearing all CAN1 filters.

To fix this clearing filter banks is moved to deinit, so they're already
clean before the next init (they should be clear on initial init, due to
peripheral reset).

The only corner case is that if you initialise CAN1 and set too many
filters, initialise CAN2, then some of the CAN1 filters may now apply to
CAN2. However this would not have worked correctly in the current version
either (the extra CAN1 filters would have been silently cleared).

Includes expanded unit tests to cover arbitrary pairs of CAN instances.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
This commit is contained in:
Angus Gratton
2026-03-25 10:36:56 +11:00
committed by Damien George
parent 7d77d3e0dd
commit dfdc71ffce
7 changed files with 193 additions and 110 deletions
+29 -6
View File
@@ -80,7 +80,7 @@
#endif
#else
#define CAN_MAX_FILTER (28)
#define CAN_MAX_FILTER_CAN1_2 (28) // This limit is the max across CAN1+CAN2 shared indexing
#define CAN_MAX_DATA_FRAME (8)
#define CAN_DEFAULT_PRESCALER (100)
@@ -232,12 +232,8 @@ static mp_obj_t pyb_can_init_helper(pyb_can_obj_t *self, size_t n_args, const mp
self->can.Init.DataTimeSeg1 = args[ARG_brs_bs1].u_int; // DataTimeSeg1 = Propagation_segment + Phase_segment_1
self->can.Init.DataTimeSeg2 = args[ARG_brs_bs2].u_int;
#else
// Init filter banks for classic CAN.
// set the can2_start_bank value
can2_start_bank = args[ARG_num_filter_banks].u_int;
int bank_offs = (self->can_id == 2) ? can2_start_bank : 0;
for (int f = 0; f < CAN_MAX_FILTER; f++) {
can_clearfilter(&self->can, f + bank_offs, can2_start_bank);
}
#endif
mp_uint_t mode = args[ARG_mode].u_int;
@@ -307,6 +303,33 @@ static MP_DEFINE_CONST_FUN_OBJ_KW(pyb_can_init_obj, 1, pyb_can_init);
// deinit()
static mp_obj_t pyb_can_deinit(mp_obj_t self_in) {
pyb_can_obj_t *self = MP_OBJ_TO_PTR(self_in);
#if !MICROPY_HW_ENABLE_FDCAN
// Clear filter banks for classic CAN, in case of re-init
int filter_min, filter_max;
switch (self->can_id) {
#ifdef MICROPY_HW_CAN3_NAME
case 3: // CAN3 filter numbering is independent of 1+2
filter_min = 0;
filter_max = CAN_HW_MAX_FILTER;
break;
#endif
#ifdef MICROPY_HW_CAN2_NAME
case 2: // CAN2 filters run from can2_start_bank to the max
filter_min = can2_start_bank;
filter_max = CAN_MAX_FILTER_CAN1_2;
break;
#endif
default: // CAN1 filters run from 0 to can2_start_bank
filter_min = 0;
filter_max = can2_start_bank;
break;
}
for (int f = filter_min; f < filter_max; f++) {
can_clearfilter(&self->can, f, can2_start_bank);
}
#endif
can_deinit(&self->can);
self->is_enabled = false;
return mp_const_none;
-44
View File
@@ -1,44 +0,0 @@
# Test machine.CAN(1) and machine.CAN(2) using loopback
#
# Single device test, assumes support for loopback and no connections to the CAN pins
#
# This test is ported from tests/ports/stm32/pyb_can2.py
try:
from machine import CAN
CAN(2, 125_000)
except (ImportError, ValueError):
print("SKIP")
raise SystemExit
import time
# Setting up each CAN peripheral independently is deliberate here, to catch
# catch cases where initialising CAN2 breaks CAN1
can1 = CAN(1, 125_000, mode=CAN.MODE_LOOPBACK)
can1.set_filters([(0x100, 0x700, 0)])
can2 = CAN(2, 125_000, mode=CAN.MODE_LOOPBACK)
can2.set_filters([(0x000, 0x7F0, 0)])
# Drain any old messages in RX FIFOs
for can in (can1, can2):
while can.recv():
pass
for id, can in ((1, can1), (2, can2)):
print("testing", id)
# message1 should only receive on can1, message2 on can2
can.send(0x123, b"message1", 0)
can.send(0x003, "message2", 0)
time.sleep_ms(10)
did_recv = False
while res := can.recv():
did_recv = True
print(hex(res[0]), bytes(res[1]), res[2], res[3])
if not did_recv:
print("no rx!")
print("done")
@@ -1,5 +0,0 @@
testing 1
0x123 b'message1' 0 0
testing 2
0x3 b'message2' 0 0
done
@@ -0,0 +1,74 @@
# Test multiple concurrent CAN instances using loopback.
# Initialising in any order shouldn't break TX, RX or filtering.
#
# This test is ported from tests/ports/stm32/pyb_can_instances.py
try:
from machine import CAN
CAN(2, 125_000) # skip any board which doesn't have at least 2 CAN peripherals
except (ImportError, ValueError):
print("SKIP")
raise SystemExit
import time
import unittest
# Some boards have 3x CAN peripherals, test all three
HAS_CAN3 = True
try:
CAN(3, 125_000)
except ValueError:
HAS_CAN3 = False
class Test(unittest.TestCase):
def test_can12(self):
self._test_pairs([(1, 2), (2, 1)])
@unittest.skipUnless(HAS_CAN3, "no CAN3")
def test_can3(self):
self._test_pairs([(1, 3), (3, 1), (2, 3), (3, 2)])
def _test_pairs(self, seq):
for id_a, id_b in seq:
with self.subTest("Testing CAN pair", id_a=id_a, id_b=id_b):
self._test_controller_pair(id_a, id_b)
def _test_controller_pair(self, id_a, id_b):
# Setting up each CAN peripheral independently is deliberate here, to catch
# catch cases where initialising CAN2 breaks CAN1 or vice versa
can_a = CAN(id_a, 125_000, mode=CAN.MODE_LOOPBACK)
can_a.set_filters([(0x100, 0x700, 0)])
can_b = CAN(id_b, 125_000, mode=CAN.MODE_LOOPBACK)
can_b.set_filters([(0x000, 0x7F0, 0)])
try:
# Drain any old messages in RX FIFOs
for can in (can_a, can_b):
while can.recv():
pass
for which, id, can in (("A", id_a, can_a), ("B", id_b, can_b)):
# print("testing config", which, "with controller", can)
# message1 should only receive on can_a, message2 on can_b
can.send(0x123, "message1", 0)
can.send(0x003, "message2", 0)
time.sleep_ms(10)
n_recv = 0
while res := can.recv():
n_recv += 1
# print(res)
if can == can_a:
self.assertEqual(res[1], b"message1", "can_a should receive message1 only")
if can == can_b:
self.assertEqual(res[1], b"message2", "can_b should receive message2 only")
self.assertEqual(n_recv, 1, "Each instance should receive exactly 1 message")
finally:
can_a.deinit()
can_b.deinit()
if __name__ == "__main__":
unittest.main()
-50
View File
@@ -1,50 +0,0 @@
try:
from pyb import CAN
CAN(2)
except (ImportError, ValueError):
print("SKIP")
raise SystemExit
# Classic CAN (aka bxCAN) hardware has a different filter API
# and some different behaviours to newer FDCAN hardware
IS_CLASSIC = hasattr(CAN, "MASK16")
# Setting up each CAN peripheral independently is deliberate here, to catch
# catch cases where initialising CAN2 breaks CAN1
can1 = CAN(1, CAN.LOOPBACK)
if IS_CLASSIC:
can1.setfilter(0, CAN.LIST16, 0, (123, 124, 125, 126))
else:
can1.setfilter(0, CAN.RANGE, 0, (123, 126))
can2 = CAN(2, CAN.LOOPBACK)
if IS_CLASSIC:
can2.setfilter(0, CAN.LIST16, 0, (3, 4, 5, 6))
else:
can2.setfilter(0, CAN.RANGE, 0, (3, 6))
# Drain any old messages in RX FIFOs
for can in (can1, can2):
while can.any(0):
can.recv(0)
for id, can in ((1, can1), (2, can2)):
print("testing", id)
# message1 should only receive on can1, message2 on can2
can.send("message1", 123)
can.send("message2", 3)
did_recv = False
try:
while True:
res = can.recv(0, timeout=50)
# not printing all of 'res' as the filter index result is different
# on Classic vs FD-CAN
print("rx", res[0], res[4])
did_recv = True
except OSError:
if not did_recv:
print("no rx!")
print("done")
-5
View File
@@ -1,5 +0,0 @@
testing 1
rx 123 b'message1'
testing 2
rx 3 b'message2'
done
+90
View File
@@ -0,0 +1,90 @@
# Test that initialising CAN instances in any order doesn't break
# TX, RX or filtering
try:
from pyb import CAN
CAN(2) # skip any board which doesn't have at least 2 CAN peripherals
except (ImportError, ValueError):
print("SKIP")
raise SystemExit
import unittest
import errno
# Classic CAN (aka bxCAN) hardware has a different filter API
# and some different behaviours to newer FDCAN hardware
IS_CLASSIC = hasattr(CAN, "MASK16")
# Some boards have 3x CAN peripherals, test all three
HAS_CAN3 = True
try:
CAN(3)
except ValueError:
HAS_CAN3 = False
class Test(unittest.TestCase):
def test_can12(self):
self._test_pairs([(1, 2), (2, 1)])
@unittest.skipUnless(HAS_CAN3, "no CAN3")
def test_can3(self):
self._test_pairs([(1, 3), (3, 1), (2, 3), (3, 2)])
def _test_pairs(self, seq):
for id_a, id_b in seq:
with self.subTest("Testing CAN pair", id_a=id_a, id_b=id_b):
self._test_controller_pair(id_a, id_b)
def _test_controller_pair(self, id_a, id_b):
# Setting up each CAN peripheral independently is deliberate here, to catch
# catch cases where initialising CAN2 breaks CAN1 or vice versa
try:
can_a = CAN(id_a, CAN.LOOPBACK)
if IS_CLASSIC:
can_a.setfilter(0, CAN.LIST16, 0, (123, 124, 125, 126))
else:
can_a.setfilter(0, CAN.RANGE, 0, (123, 126))
can_b = CAN(id_b, CAN.LOOPBACK)
if IS_CLASSIC:
can_b.setfilter(0, CAN.LIST16, 0, (3, 4, 5, 6))
else:
can_b.setfilter(0, CAN.RANGE, 0, (3, 6))
# Drain any old messages in RX FIFOs
for can in (can_a, can_b):
while can.any(0):
can.recv(0)
for which, id, can in (("A", id_a, can_a), ("B", id_b, can_b)):
print("testing config", which, "with controller", can)
# message1 should only receive on can_a, message2 on can_b
can.send("message1", 123)
can.send("message2", 3)
n_recv = 0
try:
while True:
res = can.recv(0, timeout=50)
n_recv += 1
print("received", res)
if can == can_a:
self.assertEqual(
res[4], b"message1", "can_a should receive message1 only"
)
if can == can_b:
self.assertEqual(
res[4], b"message2", "can_b should receive message2 only"
)
except OSError as e:
if e.errno != errno.ETIMEDOUT:
raise
print("recv timed out")
self.assertEqual(n_recv, 1, "Each instance should receive exactly 1 message")
finally:
can_a.deinit()
can_b.deinit()
if __name__ == "__main__":
unittest.main()