extmod/machine_uart: Factor ports' UART Python bindings to common code.

This is a code factoring to have the Python bindings in one location, and
all the ports use those same bindings.  For all ports except the two listed
below there is no functional change.

The nrf port has UART.sendbreak() removed, but this method previously did
nothing.

The zephyr port has the following methods added:
- UART.init(): supports setting timeout and timeout_char.
- UART.deinit(): does nothing, just returns None.
- UART.flush(): raises OSError(EINVAL) because it's not implemented.
- UART.any() and UART.txdone(): raise NotImplementedError.

Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
Damien George
2023-10-10 23:46:07 +11:00
parent 95d8b5fd55
commit 5b4a2baff6
65 changed files with 661 additions and 996 deletions

View File

@@ -3,7 +3,7 @@
*
* The MIT License (MIT)
*
* Copyright (c) 2013-2018 Damien P. George
* Copyright (c) 2013-2023 Damien P. George
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -24,12 +24,11 @@
* THE SOFTWARE.
*/
#include <stdio.h>
#include <string.h>
#include <stdarg.h>
// This file is never compiled standalone, it's included directly from
// extmod/machine_uart.c via MICROPY_PY_MACHINE_UART_INCLUDEFILE.
#include <string.h>
#include "py/runtime.h"
#include "py/stream.h"
#include "py/mperrno.h"
#include "py/mphal.h"
#include "shared/runtime/interrupt_char.h"
@@ -38,43 +37,13 @@
#include "irq.h"
#include "pendsv.h"
/// \moduleref pyb
/// \class UART - duplex serial communication bus
///
/// UART implements the standard UART/USART duplex serial communications protocol. At
/// the physical level it consists of 2 lines: RX and TX. The unit of communication
/// is a character (not to be confused with a string character) which can be 8 or 9
/// bits wide.
///
/// UART objects can be created and initialised using:
///
/// from pyb import UART
///
/// uart = UART(1, 9600) # init with given baudrate
/// uart.init(9600, bits=8, parity=None, stop=1) # init with given parameters
///
/// Bits can be 8 or 9. Parity can be None, 0 (even) or 1 (odd). Stop can be 1 or 2.
///
/// A UART object acts like a stream object and reading and writing is done
/// using the standard stream methods:
///
/// uart.read(10) # read 10 characters, returns a bytes object
/// uart.read() # read all available characters
/// uart.readline() # read a line
/// uart.readinto(buf) # read and store into the given buffer
/// uart.write('abc') # write the 3 characters
///
/// Individual characters can be read/written using:
///
/// uart.readchar() # read 1 character and returns it as an integer
/// uart.writechar(42) # write 1 character
///
/// To check if there is anything to be read, use:
///
/// uart.any() # returns True if any characters waiting
#define MICROPY_PY_MACHINE_UART_CLASS_CONSTANTS \
{ MP_ROM_QSTR(MP_QSTR_RTS), MP_ROM_INT(UART_HWCONTROL_RTS) }, \
{ MP_ROM_QSTR(MP_QSTR_CTS), MP_ROM_INT(UART_HWCONTROL_CTS) }, \
{ MP_ROM_QSTR(MP_QSTR_IRQ_RXIDLE), MP_ROM_INT(UART_FLAG_IDLE) }, \
STATIC void pyb_uart_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) {
pyb_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
STATIC void mp_machine_uart_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) {
machine_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
if (!self->is_enabled) {
#if defined(LPUART1)
if (self->uart_id == PYB_LPUART_1) {
@@ -171,7 +140,7 @@ STATIC void pyb_uart_print(const mp_print_t *print, mp_obj_t self_in, mp_print_k
/// - `timeout_char` is the timeout in milliseconds to wait between characters.
/// - `flow` is RTS | CTS where RTS == 256, CTS == 512
/// - `read_buf_len` is the character length of the read buffer (0 to disable).
STATIC mp_obj_t pyb_uart_init_helper(pyb_uart_obj_t *self, size_t n_args, const mp_obj_t *pos_args, mp_map_t *kw_args) {
STATIC void mp_machine_uart_init_helper(machine_uart_obj_t *self, size_t n_args, const mp_obj_t *pos_args, mp_map_t *kw_args) {
static const mp_arg_t allowed_args[] = {
{ MP_QSTR_baudrate, MP_ARG_REQUIRED | MP_ARG_INT, {.u_int = 9600} },
{ MP_QSTR_bits, MP_ARG_INT, {.u_int = 8} },
@@ -292,8 +261,6 @@ STATIC mp_obj_t pyb_uart_init_helper(pyb_uart_obj_t *self, size_t n_args, const
if (20 * baudrate_diff > actual_baudrate) {
mp_raise_msg_varg(&mp_type_ValueError, MP_ERROR_TEXT("set baudrate %d is not within 5%% of desired value"), actual_baudrate);
}
return mp_const_none;
}
/// \classmethod \constructor(bus, ...)
@@ -311,7 +278,7 @@ STATIC mp_obj_t pyb_uart_init_helper(pyb_uart_obj_t *self, size_t n_args, const
/// - `UART(6)` is on `YA`: `(TX, RX) = (Y1, Y2) = (PC6, PC7)`
/// - `UART(3)` is on `YB`: `(TX, RX) = (Y9, Y10) = (PB10, PB11)`
/// - `UART(2)` is on: `(TX, RX) = (X3, X4) = (PA2, PA3)`
STATIC mp_obj_t pyb_uart_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
STATIC mp_obj_t mp_machine_uart_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
// check arguments
mp_arg_check_num(n_args, n_kw, 1, MP_OBJ_FUN_ARGS_MAX, true);
@@ -391,59 +358,56 @@ STATIC mp_obj_t pyb_uart_make_new(const mp_obj_type_t *type, size_t n_args, size
mp_raise_msg_varg(&mp_type_ValueError, MP_ERROR_TEXT("UART(%d) is reserved"), uart_id);
}
pyb_uart_obj_t *self;
if (MP_STATE_PORT(pyb_uart_obj_all)[uart_id - 1] == NULL) {
machine_uart_obj_t *self;
if (MP_STATE_PORT(machine_uart_obj_all)[uart_id - 1] == NULL) {
// create new UART object
self = m_new0(pyb_uart_obj_t, 1);
self->base.type = &pyb_uart_type;
self = m_new0(machine_uart_obj_t, 1);
self->base.type = &machine_uart_type;
self->uart_id = uart_id;
MP_STATE_PORT(pyb_uart_obj_all)[uart_id - 1] = self;
MP_STATE_PORT(machine_uart_obj_all)[uart_id - 1] = self;
} else {
// reference existing UART object
self = MP_STATE_PORT(pyb_uart_obj_all)[uart_id - 1];
self = MP_STATE_PORT(machine_uart_obj_all)[uart_id - 1];
}
if (n_args > 1 || n_kw > 0) {
// start the peripheral
mp_map_t kw_args;
mp_map_init_fixed_table(&kw_args, n_kw, args + n_args);
pyb_uart_init_helper(self, n_args - 1, args + 1, &kw_args);
mp_machine_uart_init_helper(self, n_args - 1, args + 1, &kw_args);
}
return MP_OBJ_FROM_PTR(self);
}
STATIC mp_obj_t pyb_uart_init(size_t n_args, const mp_obj_t *args, mp_map_t *kw_args) {
return pyb_uart_init_helper(MP_OBJ_TO_PTR(args[0]), n_args - 1, args + 1, kw_args);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_KW(pyb_uart_init_obj, 1, pyb_uart_init);
/// \method deinit()
/// Turn off the UART bus.
STATIC mp_obj_t pyb_uart_deinit(mp_obj_t self_in) {
pyb_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
// Turn off the UART bus.
STATIC void mp_machine_uart_deinit(machine_uart_obj_t *self) {
uart_deinit(self);
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(pyb_uart_deinit_obj, pyb_uart_deinit);
/// \method any()
/// Return `True` if any characters waiting, else `False`.
STATIC mp_obj_t pyb_uart_any(mp_obj_t self_in) {
pyb_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
return MP_OBJ_NEW_SMALL_INT(uart_rx_any(self));
// Return number of characters waiting.
STATIC mp_int_t mp_machine_uart_any(machine_uart_obj_t *self) {
return uart_rx_any(self);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(pyb_uart_any_obj, pyb_uart_any);
/// \method writechar(char)
/// Write a single character on the bus. `char` is an integer to write.
/// Return value: `None`.
STATIC mp_obj_t pyb_uart_writechar(mp_obj_t self_in, mp_obj_t char_in) {
pyb_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
// Since uart.write() waits up to the last byte, uart.txdone() always returns True.
STATIC bool mp_machine_uart_txdone(machine_uart_obj_t *self) {
(void)self;
return true;
}
// get the character to write (might be 9 bits)
uint16_t data = mp_obj_get_int(char_in);
// Send a break condition.
STATIC void mp_machine_uart_sendbreak(machine_uart_obj_t *self) {
#if defined(STM32F0) || defined(STM32F7) || defined(STM32G0) || defined(STM32G4) || defined(STM32H5) || defined(STM32H7) || defined(STM32L0) || defined(STM32L4) || defined(STM32WB) || defined(STM32WL)
self->uartx->RQR = USART_RQR_SBKRQ; // write-only register
#else
self->uartx->CR1 |= USART_CR1_SBK;
#endif
}
// Write a single character on the bus. `data` is an integer to write.
// The `data` can be up to 9 bits.
STATIC void mp_machine_uart_writechar(machine_uart_obj_t *self, uint16_t data) {
// write the character
int errcode;
if (uart_tx_wait(self, self->timeout)) {
@@ -455,49 +419,26 @@ STATIC mp_obj_t pyb_uart_writechar(mp_obj_t self_in, mp_obj_t char_in) {
if (errcode != 0) {
mp_raise_OSError(errcode);
}
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_2(pyb_uart_writechar_obj, pyb_uart_writechar);
/// \method readchar()
/// Receive a single character on the bus.
/// Return value: The character read, as an integer. Returns -1 on timeout.
STATIC mp_obj_t pyb_uart_readchar(mp_obj_t self_in) {
pyb_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
// Receive a single character on the bus.
// Return value: The character read, as an integer. Returns -1 on timeout.
STATIC mp_int_t mp_machine_uart_readchar(machine_uart_obj_t *self) {
if (uart_rx_wait(self, self->timeout)) {
return MP_OBJ_NEW_SMALL_INT(uart_rx_char(self));
return uart_rx_char(self);
} else {
// return -1 on timeout
return MP_OBJ_NEW_SMALL_INT(-1);
return -1;
}
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(pyb_uart_readchar_obj, pyb_uart_readchar);
// uart.sendbreak()
STATIC mp_obj_t pyb_uart_sendbreak(mp_obj_t self_in) {
pyb_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
#if defined(STM32F0) || defined(STM32F7) || defined(STM32G0) || defined(STM32G4) || defined(STM32H5) || defined(STM32H7) || defined(STM32L0) || defined(STM32L4) || defined(STM32WB) || defined(STM32WL)
self->uartx->RQR = USART_RQR_SBKRQ; // write-only register
#else
self->uartx->CR1 |= USART_CR1_SBK;
#endif
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(pyb_uart_sendbreak_obj, pyb_uart_sendbreak);
// irq(handler, trigger, hard)
STATIC mp_obj_t pyb_uart_irq(size_t n_args, const mp_obj_t *pos_args, mp_map_t *kw_args) {
mp_arg_val_t args[MP_IRQ_ARG_INIT_NUM_ARGS];
mp_arg_parse_all(n_args - 1, pos_args + 1, kw_args, MP_IRQ_ARG_INIT_NUM_ARGS, mp_irq_init_args, args);
pyb_uart_obj_t *self = MP_OBJ_TO_PTR(pos_args[0]);
STATIC mp_irq_obj_t *mp_machine_uart_irq(machine_uart_obj_t *self, bool any_args, mp_arg_val_t *args) {
if (self->mp_irq_obj == NULL) {
self->mp_irq_trigger = 0;
self->mp_irq_obj = mp_irq_new(&uart_irq_methods, MP_OBJ_FROM_PTR(self));
}
if (n_args > 1 || kw_args->used != 0) {
if (any_args) {
// Check the handler
mp_obj_t handler = args[MP_IRQ_ARG_INIT_handler].u_obj;
if (handler != mp_const_none && !mp_obj_is_callable(handler)) {
@@ -519,51 +460,11 @@ STATIC mp_obj_t pyb_uart_irq(size_t n_args, const mp_obj_t *pos_args, mp_map_t *
uart_irq_config(self, true);
}
return MP_OBJ_FROM_PTR(self->mp_irq_obj);
return self->mp_irq_obj;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_KW(pyb_uart_irq_obj, 1, pyb_uart_irq);
// Since uart.write() waits up to the last byte, uart.txdone() always returns True.
STATIC mp_obj_t machine_uart_txdone(mp_obj_t self_in) {
return mp_const_true;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(machine_uart_txdone_obj, machine_uart_txdone);
STATIC const mp_rom_map_elem_t pyb_uart_locals_dict_table[] = {
// instance methods
{ MP_ROM_QSTR(MP_QSTR_init), MP_ROM_PTR(&pyb_uart_init_obj) },
{ MP_ROM_QSTR(MP_QSTR_deinit), MP_ROM_PTR(&pyb_uart_deinit_obj) },
{ MP_ROM_QSTR(MP_QSTR_any), MP_ROM_PTR(&pyb_uart_any_obj) },
/// \method read([nbytes])
{ MP_ROM_QSTR(MP_QSTR_read), MP_ROM_PTR(&mp_stream_read_obj) },
/// \method readline()
{ MP_ROM_QSTR(MP_QSTR_readline), MP_ROM_PTR(&mp_stream_unbuffered_readline_obj)},
/// \method readinto(buf[, nbytes])
{ MP_ROM_QSTR(MP_QSTR_readinto), MP_ROM_PTR(&mp_stream_readinto_obj) },
/// \method write(buf)
{ MP_ROM_QSTR(MP_QSTR_write), MP_ROM_PTR(&mp_stream_write_obj) },
{ MP_ROM_QSTR(MP_QSTR_flush), MP_ROM_PTR(&mp_stream_flush_obj) },
{ MP_ROM_QSTR(MP_QSTR_irq), MP_ROM_PTR(&pyb_uart_irq_obj) },
{ MP_ROM_QSTR(MP_QSTR_writechar), MP_ROM_PTR(&pyb_uart_writechar_obj) },
{ MP_ROM_QSTR(MP_QSTR_readchar), MP_ROM_PTR(&pyb_uart_readchar_obj) },
{ MP_ROM_QSTR(MP_QSTR_sendbreak), MP_ROM_PTR(&pyb_uart_sendbreak_obj) },
{ MP_ROM_QSTR(MP_QSTR_txdone), MP_ROM_PTR(&machine_uart_txdone_obj) },
// class constants
{ MP_ROM_QSTR(MP_QSTR_RTS), MP_ROM_INT(UART_HWCONTROL_RTS) },
{ MP_ROM_QSTR(MP_QSTR_CTS), MP_ROM_INT(UART_HWCONTROL_CTS) },
// IRQ flags
{ MP_ROM_QSTR(MP_QSTR_IRQ_RXIDLE), MP_ROM_INT(UART_FLAG_IDLE) },
};
STATIC MP_DEFINE_CONST_DICT(pyb_uart_locals_dict, pyb_uart_locals_dict_table);
STATIC mp_uint_t pyb_uart_read(mp_obj_t self_in, void *buf_in, mp_uint_t size, int *errcode) {
pyb_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
STATIC mp_uint_t mp_machine_uart_read(mp_obj_t self_in, void *buf_in, mp_uint_t size, int *errcode) {
machine_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
byte *buf = buf_in;
// check that size is a multiple of character width
@@ -604,8 +505,8 @@ STATIC mp_uint_t pyb_uart_read(mp_obj_t self_in, void *buf_in, mp_uint_t size, i
}
}
STATIC mp_uint_t pyb_uart_write(mp_obj_t self_in, const void *buf_in, mp_uint_t size, int *errcode) {
pyb_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
STATIC mp_uint_t mp_machine_uart_write(mp_obj_t self_in, const void *buf_in, mp_uint_t size, int *errcode) {
machine_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
const byte *buf = buf_in;
// check that size is a multiple of character width
@@ -631,8 +532,8 @@ STATIC mp_uint_t pyb_uart_write(mp_obj_t self_in, const void *buf_in, mp_uint_t
}
}
STATIC mp_uint_t pyb_uart_ioctl(mp_obj_t self_in, mp_uint_t request, uintptr_t arg, int *errcode) {
pyb_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
STATIC mp_uint_t mp_machine_uart_ioctl(mp_obj_t self_in, mp_uint_t request, uintptr_t arg, int *errcode) {
machine_uart_obj_t *self = MP_OBJ_TO_PTR(self_in);
mp_uint_t ret;
if (request == MP_STREAM_POLL) {
uintptr_t flags = arg;
@@ -652,20 +553,3 @@ STATIC mp_uint_t pyb_uart_ioctl(mp_obj_t self_in, mp_uint_t request, uintptr_t a
}
return ret;
}
STATIC const mp_stream_p_t uart_stream_p = {
.read = pyb_uart_read,
.write = pyb_uart_write,
.ioctl = pyb_uart_ioctl,
.is_text = false,
};
MP_DEFINE_CONST_OBJ_TYPE(
pyb_uart_type,
MP_QSTR_UART,
MP_TYPE_FLAG_ITER_IS_STREAM,
make_new, pyb_uart_make_new,
print, pyb_uart_print,
protocol, &uart_stream_p,
locals_dict, &pyb_uart_locals_dict
);