Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,21 @@ Low-level socket operations

This method is a :ref:`coroutine <coroutine>`.

.. coroutinemethod:: AbstractEventLoop.sock_recv_into(sock, buf)

Receive data from the socket. Modeled after blocking
:meth:`socket.socket.recv_into` method.

The received data is written into *buf* (a writable buffer).
The return value is the number of bytes written.

With :class:`SelectorEventLoop` event loop, the socket *sock* must be
non-blocking.

This method is a :ref:`coroutine <coroutine>`.

.. versionadded:: 3.7

.. coroutinemethod:: AbstractEventLoop.sock_sendall(sock, data)

Send data to the socket. Modeled after blocking
Expand Down
3 changes: 3 additions & 0 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ def remove_writer(self, fd):
def sock_recv(self, sock, nbytes):
raise NotImplementedError

def sock_recv_into(self, sock, buf):
raise NotImplementedError

def sock_sendall(self, sock, data):
raise NotImplementedError

Expand Down
3 changes: 3 additions & 0 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,9 @@ def close(self):
def sock_recv(self, sock, n):
return self._proactor.recv(sock, n)

def sock_recv_into(self, sock, buf):
return self._proactor.recv_into(sock, buf)

def sock_sendall(self, sock, data):
return self._proactor.send(sock, data)

Expand Down
35 changes: 35 additions & 0 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,41 @@ def _sock_recv(self, fut, registered, sock, n):
else:
fut.set_result(data)

def sock_recv_into(self, sock, buf):
"""Receive data from the socket.

The received data is written into *buf* (a writable buffer).
The return value is the number of bytes written.

This method is a coroutine.
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = self.create_future()
self._sock_recv_into(fut, False, sock, buf)
return fut
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want, you can further optimize this by transforming sock_recv_into into an async def coroutine, and creating the fut object only when sock.recv_into(buf) fails with a BlockingError.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's very useful, as people will mainly want to use that for large data. Right now the implementation mirrors that of sock_recv, which is easier to understand.


def _sock_recv_into(self, fut, registered, sock, buf):
# _sock_recv_into() can add itself as an I/O callback if the operation
# can't be done immediately. Don't use it directly, call sock_recv_into().
fd = sock.fileno()
if registered:
# Remove the callback early. It should be rare that the
# selector says the fd is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
self.remove_reader(fd)
if fut.cancelled():
return
try:
nbytes = sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
except Exception as exc:
fut.set_exception(exc)
else:
fut.set_result(nbytes)

def sock_sendall(self, sock, data):
"""Send data to the socket.

Expand Down
22 changes: 22 additions & 0 deletions Lib/asyncio/windows_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,28 @@ def finish_recv(trans, key, ov):

return self._register(ov, conn, finish_recv)

def recv_into(self, conn, buf, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
try:
if isinstance(conn, socket.socket):
ov.WSARecvInto(conn.fileno(), buf, flags)
else:
ov.ReadFileInto(conn.fileno(), buf)
except BrokenPipeError:
return self._result(b'')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc says "The return value is the number of bytes written." but here an empty byte string is returned! I created https://bugs.python.org/issue41467 to track this bug.


def finish_recv(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
raise ConnectionResetError(*exc.args)
else:
raise

return self._register(ov, conn, finish_recv)

def send(self, conn, buf, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
Expand Down
26 changes: 26 additions & 0 deletions Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ def _basetest_sock_client_ops(self, httpd, sock):
with self.assertRaises(ValueError):
self.loop.run_until_complete(
self.loop.sock_recv(sock, 1024))
with self.assertRaises(ValueError):
self.loop.run_until_complete(
self.loop.sock_recv_into(sock, bytearray()))
with self.assertRaises(ValueError):
self.loop.run_until_complete(
self.loop.sock_accept(sock))
Expand All @@ -443,16 +446,37 @@ def _basetest_sock_client_ops(self, httpd, sock):
sock.close()
self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))

def _basetest_sock_recv_into(self, httpd, sock):
# same as _basetest_sock_client_ops, but using sock_recv_into
sock.setblocking(False)
self.loop.run_until_complete(
self.loop.sock_connect(sock, httpd.address))
self.loop.run_until_complete(
self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
data = bytearray(1024)
with memoryview(data) as buf:
nbytes = self.loop.run_until_complete(
self.loop.sock_recv_into(sock, buf[:1024]))
# consume data
self.loop.run_until_complete(
self.loop.sock_recv_into(sock, buf[nbytes:]))
sock.close()
self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))

def test_sock_client_ops(self):
with test_utils.run_test_server() as httpd:
sock = socket.socket()
self._basetest_sock_client_ops(httpd, sock)
sock = socket.socket()
self._basetest_sock_recv_into(httpd, sock)

@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_unix_sock_client_ops(self):
with test_utils.run_test_unix_server() as httpd:
sock = socket.socket(socket.AF_UNIX)
self._basetest_sock_client_ops(httpd, sock)
sock = socket.socket(socket.AF_UNIX)
self._basetest_sock_recv_into(httpd, sock)

def test_sock_client_fail(self):
# Make sure that we will get an unused port
Expand Down Expand Up @@ -2612,6 +2636,8 @@ def test_not_implemented(self):
NotImplementedError, loop.remove_writer, 1)
self.assertRaises(
NotImplementedError, loop.sock_recv, f, 10)
self.assertRaises(
NotImplementedError, loop.sock_recv_into, f, 10)
self.assertRaises(
NotImplementedError, loop.sock_sendall, f, 10)
self.assertRaises(
Expand Down
5 changes: 5 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,11 @@ def test_sock_recv(self):
self.loop.sock_recv(self.sock, 1024)
self.proactor.recv.assert_called_with(self.sock, 1024)

def test_sock_recv_into(self):
buf = bytearray(10)
self.loop.sock_recv_into(self.sock, buf)
self.proactor.recv_into.assert_called_with(self.sock, buf)

def test_sock_sendall(self):
self.loop.sock_sendall(self.sock, b'data')
self.proactor.send.assert_called_with(self.sock, b'data')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add AbstractEventLoop.sock_recv_into().
Loading