Files
micropython/tests/multi_net/tcp_sendall.py
Alessandro Gatti 024178455d unix/modsocket: Add "sendall" method to socket class.
This commit adds an implementation for "socket.sendall" to the Unix
port.

Right now the implementation is a wrapper over a single "socket.send"
call, since it wasn't possible to let "socket.send" perform a partial
transfer.  With no partial transfers it is not possible to have a
testable implementation following the expected behaviour, so a single
send operation is done and OSErr(EINTR) is raised if a partial transfer
occurs.

The discussion for the issue linked to this PR contains more information
about the efforts made to let partial transfers happen.

This fixes #16803.

Signed-off-by: Alessandro Gatti <a.gatti@frob.it>
2026-04-08 19:20:51 +10:00

72 lines
2.2 KiB
Python

# Simple test of a TCP server and client transferring data using sendall.
import socket
if not hasattr(socket.socket, "sendall"):
print("SKIP")
raise SystemExit
PORT = 8000
# This is the smallest send buffer size supported by Linux, see socket(7).
SEND_SIZE = 1024
BUFFERS = 8
# G. D. Nguyen, "Fast CRCs", vol. 18, no. 10, pp. 1321-1331, Oct. 2009
# https://dl.acm.org/doi/abs/10.1109/TC.2009.83
def calculate_checksum(buffer):
checksum = 0
for byte in buffer:
checksum = (checksum ^ byte) & 0xFFFF
for step in range(16):
if (checksum & 0x8000) != 0:
checksum = ((checksum << 1) ^ 0x8005) & 0xFFFF
else:
checksum = (checksum << 1) & 0xFFFF
return checksum
# Server
def instance0():
multitest.globals(IP=multitest.get_network_ip())
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1])
s.listen()
multitest.next()
s2, _ = s.accept()
buffer = b""
while len(buffer) < SEND_SIZE * BUFFERS:
buffer += s2.recv(SEND_SIZE * BUFFERS)
s2.send(b"%08x" % len(buffer))
s2.send(b"%04x" % calculate_checksum(buffer))
s2.close()
s.close()
print("received {} bytes".format(len(buffer)))
# Client
def instance1():
multitest.next()
s = socket.socket()
s.connect(socket.getaddrinfo(IP, PORT)[0][-1])
# Try our best to trigger a partial send in socket.sendall
if hasattr(socket, "SO_SNDBUF"):
s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, SEND_SIZE)
buffer = bytearray(SEND_SIZE * BUFFERS)
for index in range(len(buffer)):
buffer[index] = index & 0xFF
expected_checksum = calculate_checksum(buffer)
s.sendall(buffer)
print("sent {} bytes".format(len(buffer)))
length = int(s.recv(8), 16)
if length != len(buffer):
raise Exception("sendall len fail (expected %d, got %d)" % (len(buffer), length))
checksum = int(s.recv(4), 16)
if checksum != expected_checksum:
raise Exception(
"sendall checksum fail (expected %04x, got %04x)" % (expected_checksum, checksum)
)
s.close()