Files
micropython/tests/multi_extmod/machine_can_02_rx_callback.py
T
Angus Gratton 6cac2d275d
JavaScript code lint and formatting with Biome / eslint (push) Has been cancelled
Check code formatting / code-formatting (push) Has been cancelled
Check spelling with codespell / codespell (push) Has been cancelled
Build docs / build (push) Has been cancelled
Check examples / embedding (push) Has been cancelled
Package mpremote / build (push) Has been cancelled
.mpy file format and tools / test (push) Has been cancelled
Build ports metadata / build (push) Has been cancelled
alif port / build_alif (alif_ae3_build) (push) Has been cancelled
cc3200 port / build (push) Has been cancelled
esp32 port / build_idf (esp32_build_c2_c5_c6) (push) Has been cancelled
esp32 port / build_idf (esp32_build_cmod_spiram_s2) (push) Has been cancelled
esp32 port / build_idf (esp32_build_p4) (push) Has been cancelled
esp32 port / build_idf (esp32_build_s3_c3) (push) Has been cancelled
esp8266 port / build (push) Has been cancelled
mimxrt port / build (push) Has been cancelled
nrf port / build (push) Has been cancelled
powerpc port / build (push) Has been cancelled
qemu port / build_and_test_arm (bigendian) (push) Has been cancelled
qemu port / build_and_test_arm (sabrelite) (push) Has been cancelled
qemu port / build_and_test_arm (thumb_hardfp) (push) Has been cancelled
qemu port / build_and_test_arm (thumb_softfp) (push) Has been cancelled
qemu port / build_and_test_rv32 (push) Has been cancelled
qemu port / build_and_test_rv64 (push) Has been cancelled
renesas-ra port / build_renesas_ra_board (push) Has been cancelled
rp2 port / build (push) Has been cancelled
samd port / build (push) Has been cancelled
stm32 port / build_stm32 (stm32_misc_build) (push) Has been cancelled
stm32 port / build_stm32 (stm32_nucleo_build) (push) Has been cancelled
stm32 port / build_stm32 (stm32_pyb_build) (push) Has been cancelled
unix port / minimal (push) Has been cancelled
unix port / reproducible (push) Has been cancelled
unix port / standard (push) Has been cancelled
unix port / standard_v2 (push) Has been cancelled
unix port / coverage (push) Has been cancelled
unix port / coverage_32bit (push) Has been cancelled
unix port / nanbox (push) Has been cancelled
unix port / longlong (push) Has been cancelled
unix port / float (push) Has been cancelled
unix port / gil_enabled (push) Has been cancelled
unix port / stackless_clang (push) Has been cancelled
unix port / float_clang (push) Has been cancelled
unix port / settrace_stackless (push) Has been cancelled
unix port / repr_b (push) Has been cancelled
unix port / macos (push) Has been cancelled
unix port / qemu_mips (push) Has been cancelled
unix port / qemu_arm (push) Has been cancelled
unix port / qemu_riscv64 (push) Has been cancelled
unix port / sanitize_address (push) Has been cancelled
unix port / sanitize_undefined (push) Has been cancelled
webassembly port / build (push) Has been cancelled
windows port / build-vs (Debug, true, x64, dev, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Debug, true, x86, dev, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Debug, x64, dev, 2022, [17, 18)) (push) Has been cancelled
windows port / build-vs (Debug, x86, dev, 2022, [17, 18)) (push) Has been cancelled
windows port / build-vs (Release, true, x64, dev, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Release, true, x64, dev, 2019, [16, 17)) (push) Has been cancelled
windows port / build-vs (Release, true, x64, standard, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Release, true, x64, standard, 2019, [16, 17)) (push) Has been cancelled
windows port / build-vs (Release, true, x86, dev, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Release, true, x86, dev, 2019, [16, 17)) (push) Has been cancelled
windows port / build-vs (Release, true, x86, standard, 2017, [15, 16)) (push) Has been cancelled
windows port / build-vs (Release, true, x86, standard, 2019, [16, 17)) (push) Has been cancelled
windows port / build-vs (Release, x64, dev, 2022, [17, 18)) (push) Has been cancelled
windows port / build-vs (Release, x64, standard, 2022, [17, 18)) (push) Has been cancelled
windows port / build-vs (Release, x86, dev, 2022, [17, 18)) (push) Has been cancelled
windows port / build-vs (Release, x86, standard, 2022, [17, 18)) (push) Has been cancelled
windows port / build-mingw (i686, mingw32, dev) (push) Has been cancelled
windows port / build-mingw (i686, mingw32, standard) (push) Has been cancelled
windows port / build-mingw (x86_64, mingw64, dev) (push) Has been cancelled
windows port / build-mingw (x86_64, mingw64, standard) (push) Has been cancelled
windows port / cross-build-on-linux (push) Has been cancelled
zephyr port / build (push) Has been cancelled
Python code lint and formatting with ruff / ruff (push) Has been cancelled
stm32: Add machine.CAN implementation.
Implemented according to API docs in a parent comment.

Adds new multi_extmod/machine_can_* tests which pass when testing between
NUCLEO_G474RE, NUCLEO_H723ZG and PYBDV11.

This work was mostly funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2026-03-19 17:36:50 +11:00

123 lines
3.7 KiB
Python

from machine import CAN
import time
# Test the CAN.IRQ_RX irq handler, including overflow
rx_overflow = False
rx_full = False
received = []
# CAN IDs
ID_SPAM = 0x345 # messages spammed into the receive FIFO
ID_ACK_OFLOW = 0x055 # message the receiver sends after it's seen an overflow
ID_AFTER = 0x100 # message the sender sends after the ACK
can = CAN(1, 500_000)
# A very basic "soft" receiver handler that stores received messages into a global list
def receiver_irq_recv(can):
global rx_overflow, rx_full
assert can.irq().flags() & can.IRQ_RX # the only enabled IRQ
can_id, data, _flags, errors = can.recv()
received.append((can_id, None))
# The FIFO is expected not to overflow by itself, wait until 40 messages
# have been received and then block the receive handler to induce an overflow
if len(received) == 40:
assert not rx_overflow # shouldn't have already happened, either
time.sleep_ms(500)
if not rx_overflow and (errors & CAN.RECV_ERR_OVERRUN):
# expected this should happen on the very next message after
# the one where we slept for 500ms
print("irq_recv overrun", len(received))
received.clear() # check we still get some messages, see rx_spam print line below
rx_overflow = True
# also expect the FIFO to be FULL again immediately after overrunning and rx_overflow event
if rx_overflow and (errors & CAN.RECV_ERR_OVERRUN | CAN.RECV_ERR_FULL) == CAN.RECV_ERR_FULL:
rx_full = True
# Receiver
def instance0():
can.irq(receiver_irq_recv, trigger=can.IRQ_RX, hard=False)
can.set_filters(None) # receive all
multitest.next()
while not rx_overflow:
pass # Resume ASAP after FIFO0 overflows
can.send(ID_ACK_OFLOW, b"overflow")
# at least one ID_SPAM message should have been received
# *after* we overflowed and 'received' was clear in the irq handler
print("rx_spam", any(r[0] == ID_SPAM for r in received))
# wait until the "after" message is received
for n in range(100):
if any(r[0] == ID_AFTER for r in received):
break
time.sleep_ms(10)
can.irq(None) # disable the IRQ
received.clear()
# at some point while waiting for ID_AFTER the FIFO should have gotten
# full again
print("rx_full", rx_full)
# now IRQ is disabled, no new messages should be received
time.sleep_ms(250)
print("len", len(received))
received_ack = False
# reusing the result buffer so sender_irq_recv can be 'hard'
sender_irq_result = [None, memoryview(bytearray(64)), None, None]
def sender_irq_recv(can):
global received_ack
assert can.irq().flags() & can.IRQ_RX # the only enabled IRQ
can_id, data, _flags, _errors = can.recv(sender_irq_result)
print("sender_irq_recv", can_id, len(data)) # should be ID_ACK_OFLOW and "overflow" payload
received_ack = True
# Sender
def instance1():
can.irq(sender_irq_recv, CAN.IRQ_RX, hard=True)
can.set_filters(None)
multitest.next()
# Spam out messages until the receiver tells us its RX FIFO is full.
#
# The RX FIFO on the receiver can vary from 3 deep (BXCAN) to 25 deep (STM32H7),
# so we keep sending to it until we see a CAN message on ID_ACK_OFLOW indicating
# the receiver's FIFO has overflowed
while not received_ack:
for i in range(255):
while can.send(ID_SPAM, bytes([i] * 8)) is None and not received_ack:
# Don't overflow the TX FIFO
time.sleep_ms(1)
if received_ack:
break
# give the receiver some time to make space in the FIFO
time.sleep_ms(200)
# send the final message, the receiver should get this one
can.send(ID_AFTER, b"aaaaa")