mirror of
https://github.com/micropython/micropython.git
synced 2026-05-01 05:10:15 +02:00
024178455d
This commit adds an implementation for "socket.sendall" to the Unix port. Right now the implementation is a wrapper over a single "socket.send" call, since it wasn't possible to let "socket.send" perform a partial transfer. With no partial transfers it is not possible to have a testable implementation following the expected behaviour, so a single send operation is done and OSErr(EINTR) is raised if a partial transfer occurs. The discussion for the issue linked to this PR contains more information about the efforts made to let partial transfers happen. This fixes #16803. Signed-off-by: Alessandro Gatti <a.gatti@frob.it>
72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
# Simple test of a TCP server and client transferring data using sendall.
|
|
|
|
import socket
|
|
|
|
if not hasattr(socket.socket, "sendall"):
|
|
print("SKIP")
|
|
raise SystemExit
|
|
|
|
PORT = 8000
|
|
|
|
# This is the smallest send buffer size supported by Linux, see socket(7).
|
|
SEND_SIZE = 1024
|
|
BUFFERS = 8
|
|
|
|
|
|
# G. D. Nguyen, "Fast CRCs", vol. 18, no. 10, pp. 1321-1331, Oct. 2009
|
|
# https://dl.acm.org/doi/abs/10.1109/TC.2009.83
|
|
def calculate_checksum(buffer):
|
|
checksum = 0
|
|
for byte in buffer:
|
|
checksum = (checksum ^ byte) & 0xFFFF
|
|
for step in range(16):
|
|
if (checksum & 0x8000) != 0:
|
|
checksum = ((checksum << 1) ^ 0x8005) & 0xFFFF
|
|
else:
|
|
checksum = (checksum << 1) & 0xFFFF
|
|
return checksum
|
|
|
|
|
|
# Server
|
|
def instance0():
|
|
multitest.globals(IP=multitest.get_network_ip())
|
|
s = socket.socket()
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
s.bind(socket.getaddrinfo("0.0.0.0", PORT)[0][-1])
|
|
s.listen()
|
|
multitest.next()
|
|
s2, _ = s.accept()
|
|
buffer = b""
|
|
while len(buffer) < SEND_SIZE * BUFFERS:
|
|
buffer += s2.recv(SEND_SIZE * BUFFERS)
|
|
s2.send(b"%08x" % len(buffer))
|
|
s2.send(b"%04x" % calculate_checksum(buffer))
|
|
s2.close()
|
|
s.close()
|
|
print("received {} bytes".format(len(buffer)))
|
|
|
|
|
|
# Client
|
|
def instance1():
|
|
multitest.next()
|
|
s = socket.socket()
|
|
s.connect(socket.getaddrinfo(IP, PORT)[0][-1])
|
|
# Try our best to trigger a partial send in socket.sendall
|
|
if hasattr(socket, "SO_SNDBUF"):
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, SEND_SIZE)
|
|
buffer = bytearray(SEND_SIZE * BUFFERS)
|
|
for index in range(len(buffer)):
|
|
buffer[index] = index & 0xFF
|
|
expected_checksum = calculate_checksum(buffer)
|
|
s.sendall(buffer)
|
|
print("sent {} bytes".format(len(buffer)))
|
|
length = int(s.recv(8), 16)
|
|
if length != len(buffer):
|
|
raise Exception("sendall len fail (expected %d, got %d)" % (len(buffer), length))
|
|
checksum = int(s.recv(4), 16)
|
|
if checksum != expected_checksum:
|
|
raise Exception(
|
|
"sendall checksum fail (expected %04x, got %04x)" % (expected_checksum, checksum)
|
|
)
|
|
s.close()
|