From e9084a17f6056561def46218dfa8c7b7629c8212 Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Fri, 24 Mar 2017 21:45:05 -0400 Subject: [PATCH 01/42] bpo-29883: Add UDP to Windows Proactor Event Loop --- Doc/library/asyncio-eventloop.rst | 5 +- Lib/asyncio/proactor_events.py | 103 +++++++ Lib/asyncio/windows_events.py | 34 +++ Lib/test/test_asyncio/test_proactor_events.py | 76 +++++ Modules/overlapped.c | 275 +++++++++++++++++- 5 files changed, 487 insertions(+), 6 deletions(-) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index bf7c93a86fd041..571bc1db36da98 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -482,8 +482,6 @@ Opening network connections transport. If specified, *local_addr* and *remote_addr* should be omitted (must be :const:`None`). - On Windows, with :class:`ProactorEventLoop`, this method is not supported. - See :ref:`UDP echo client protocol ` and :ref:`UDP echo server protocol ` examples. @@ -491,6 +489,9 @@ Opening network connections The *family*, *proto*, *flags*, *reuse_address*, *reuse_port, *allow_broadcast*, and *sock* parameters were added. + .. versionchanged:: 3.8.0 + Added support for Windows. + .. coroutinemethod:: loop.create_unix_connection(protocol_factory, \ path=None, \*, ssl=None, sock=None, \ server_hostname=None, ssl_handshake_timeout=None) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index a849be1cc1479b..a8d8c231b47a57 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -11,6 +11,7 @@ import socket import warnings import signal +import collections from . import base_events from . import constants @@ -425,6 +426,102 @@ def _pipe_closed(self, fut): self.close() +class _ProactorDatagramTransport(_ProactorBasePipeTransport): + + def __init__(self, loop, sock, protocol, address=None, + waiter=None, extra=None): + super(_ProactorDatagramTransport, self).__init__(loop, sock, protocol, + waiter=waiter, + extra=extra) + self._address = address + # We don't need to call _protocol.connection_made() since our base + # constructor does it for us. + self._buffer = collections.deque() + self._loop.call_soon(self._loop_reading) + + def abort(self): + self._force_close(None) + + def sendto(self, data, addr=None): + if not isinstance(data, (bytes, bytearray, memoryview)): + raise TypeError('data argument must be byte-ish (%r)', + type(data)) + + if not data: + return + + if self._conn_lost and self._address: + # close() or force_close() has been called on the bound endpoint + return + + self._buffer.appendleft((data, addr)) + + if self._write_fut is None: + # No current write operations are active, kick one off + self._loop_writing() + else: + # A write operation is already kicked off + pass + + def _loop_writing(self, fut=None): + if self._conn_lost: + return + + assert fut is self._write_fut + if fut: + # We are in a _loop_writing() done callback, get the result + fut.result() + + if not self._buffer or (self._conn_lost and self._address): + # The connection has been closed + self._write_fut = None + return + + data, addr = self._buffer.pop() + + self._write_fut = None + try: + if self._address: + self._write_fut = self._loop._proactor.send(self._sock, data) + else: + self._write_fut = self._loop._proactor.sendto(self._sock, data, addr=addr) + except OSError as exc: + self._protocol.error_received(exc) + self._fatal_error(exc, 'Fatal error sending UDP datagram') + else: + self._write_fut.add_done_callback(self._loop_writing) + + def _loop_reading(self, fut=None): + if self._conn_lost: + return + + assert self._read_fut is fut + + if fut: + res = fut.result() + + if self._address: + data, addr = res, self._address + else: + data, addr = res + + self._protocol.datagram_received(data, addr) + + if self._conn_lost: + return + + try: + if self._address: + self._read_fut = self._loop._proactor.recv(self._sock, 4096) + else: + self._read_fut = self._loop._proactor.recvfrom(self._sock, 4096) + except OSError as exc: + self._protocol.error_received(exc) + self._fatal_error(exc, "Fatal error reading from UDP endpoint") + else: + self._read_fut.add_done_callback(self._loop_reading) + + class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport, _ProactorBaseWritePipeTransport, transports.Transport): @@ -506,10 +603,16 @@ def _make_ssl_transport( self, protocol, sslcontext, waiter, server_side, server_hostname, ssl_handshake_timeout=ssl_handshake_timeout) + _ProactorSocketTransport(self, rawsock, ssl_protocol, extra=extra, server=server) return ssl_protocol._app_transport + def _make_datagram_transport(self, sock, protocol, + address=None, waiter=None, extra=None): + return _ProactorDatagramTransport(self, sock, protocol, address, + waiter, extra) + def _make_duplex_pipe_transport(self, sock, protocol, waiter=None, extra=None): return _ProactorDuplexPipeTransport(self, diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 29750f18d80c46..c94147991a7480 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -469,6 +469,14 @@ def recv_into(self, conn, buf, flags=0): except BrokenPipeError: return self._result(b'') + def recvfrom(self, conn, nbytes, flags=0): + self._register_with_iocp(conn) + ov = _overlapped.Overlapped(NULL) + try: + ov.WSARecvFrom(conn.fileno(), nbytes, flags) + except BrokenPipeError: + return self._result((b'', (None, None))) + def finish_recv(trans, key, ov): try: return ov.getresult() @@ -481,6 +489,23 @@ def finish_recv(trans, key, ov): return self._register(ov, conn, finish_recv) + def sendto(self, conn, buf, flags=0, addr=None): + self._register_with_iocp(conn) + ov = _overlapped.Overlapped(NULL) + + ov.WSASendTo(conn.fileno(), buf, flags, addr) + + def finish_send(trans, key, ov): + try: + return ov.getresult() + except OSError as exc: + if exc.winerror == _overlapped.ERROR_NETNAME_DELETED: + raise ConnectionResetError(*exc.args) + else: + raise + + return self._register(ov, conn, finish_send) + def send(self, conn, buf, flags=0): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) @@ -530,6 +555,15 @@ async def accept_coro(future, conn): return future def connect(self, conn, address): + if conn.type == socket.SOCK_DGRAM: + # WSAConnect will complete immediately for UDP sockets so we don't + # need to register any IOCP operation + _overlapped.WSAConnect(conn.fileno(), address) + fut = self._loop.create_future() + + fut.set_result(None) + return fut + self._register_with_iocp(conn) # The socket needs to be locally bound before we call ConnectEx(). try: diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 5952ccccce0e5d..b7626810695b8b 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -12,6 +12,7 @@ from asyncio.proactor_events import _ProactorSocketTransport from asyncio.proactor_events import _ProactorWritePipeTransport from asyncio.proactor_events import _ProactorDuplexPipeTransport +from asyncio.proactor_events import _ProactorDatagramTransport from test import support from test.test_asyncio import utils as test_utils @@ -976,6 +977,81 @@ def test_sock_sendfile_not_regular_file(self): 0, None)) self.assertEqual(self.file.tell(), 0) + def datagram_transport(self): + self.protocol = make_test_protocol(asyncio.DatagramProtocol) + return self.loop._make_datagram_transport(self.sock, self.protocol) + + def test_make_datagram_transport(self): + tr = self.datagram_transport() + self.assertIsInstance(tr, _ProactorDatagramTransport) + close_transport(tr) + + def test_datagram_loop_writing(self): + tr = self.datagram_transport() + tr._buffer.appendleft((b'data', ('127.0.0.1', 12068))) + tr._loop_writing() + self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068)) + self.loop._proactor.sendto.return_value.add_done_callback.\ + assert_called_with(tr._loop_writing) + + close_transport(tr) + + def test_datagram_loop_reading(self): + tr = self.datagram_transport() + tr._loop_reading() + self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096) + self.assertFalse(self.protocol.datagram_received.called) + self.assertFalse(self.protocol.error_received.called) + + def test_datagram_loop_reading_data(self): + res = asyncio.Future(loop=self.loop) + res.set_result((b'data', ('127.0.0.1', 12068))) + + tr = self.datagram_transport() + tr._read_fut = res + tr._loop_reading(res) + self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096) + self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068)) + + def test_datagram_loop_reading_no_data(self): + res = asyncio.Future(loop=self.loop) + res.set_result((b'', ('127.0.0.1', 12068))) + + tr = self.datagram_transport() + self.assertRaises(AssertionError, tr._loop_reading, res) + + tr.close = mock.Mock() + tr._read_fut = res + tr._loop_reading(res) + self.assertFalse(self.loop._proactor.recvfrom.called) + self.assertFalse(self.protocol.error_received.called) + self.assertFalse(tr.close.called) + + def test_datagram_loop_reading_aborted(self): + err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError() + + tr = self.datagram_transport() + tr._fatal_error = mock.Mock() + tr._protocol.error_received = mock.Mock() + tr._loop_reading() + tr._protocol.error_received.assert_called_with(err) + tr._fatal_error.assert_called_with( + err, + 'Fatal error reading from UDP endpoint') + + def test_datagram_loop_writing_aborted(self): + err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError() + + tr = self.datagram_transport() + tr._fatal_error = mock.Mock() + tr._protocol.error_received = mock.Mock() + tr._protocol.error_received.assert_called_with(err) + tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068))) + tr._loop_writing() + tr._fatal_error.assert_called_with( + err, + 'Fatal error sending UDP datagram') + if __name__ == '__main__': unittest.main() diff --git a/Modules/overlapped.c b/Modules/overlapped.c index e5a209bf758297..0ebac151c9fee9 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -39,7 +39,7 @@ enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_READINTO, TYPE_WRITE, TYPE_ACCEPT, TYPE_CONNECT, TYPE_DISCONNECT, TYPE_CONNECT_NAMED_PIPE, - TYPE_WAIT_NAMED_PIPE_AND_CONNECT, TYPE_TRANSMIT_FILE}; + TYPE_WAIT_NAMED_PIPE_AND_CONNECT, TYPE_TRANSMIT_FILE, TYPE_READ_FROM, TYPE_WRITE_TO}; typedef struct { PyObject_HEAD @@ -53,8 +53,18 @@ typedef struct { union { /* Buffer allocated by us: TYPE_READ and TYPE_ACCEPT */ PyObject *allocated_buffer; - /* Buffer passed by the user: TYPE_WRITE and TYPE_READINTO */ + /* Buffer passed by the user: TYPE_WRITE, TYPE_WRITE_TO, and TYPE_READINTO */ Py_buffer user_buffer; + + /* Data used for reading from a connection-less socket: TYPE_READ_FROM */ + struct { + // A (buffer, (host, port)) tuple + PyObject *result; + // The actual read buffer + PyObject *buffer; + struct sockaddr_in6 address; + int address_length; + } read_from; }; } OverlappedObject; @@ -574,7 +584,22 @@ Overlapped_clear(OverlappedObject *self) case TYPE_ACCEPT: Py_CLEAR(self->allocated_buffer); break; + case TYPE_READ_FROM: + // An initial call to WSARecvFrom will only allocate the buffer. + // The result tuple of (message, (address, port)) is only + // allocated _after_ a message has been received. + if(self->read_from.result) { + // We've received a message, free the result tuple which will + // also free the message buffer. + Py_CLEAR(self->read_from.result); + self->read_from.buffer = NULL; + } else if(self->read_from.buffer) { + // We haven't received a message, only free the buffer. + Py_CLEAR(self->read_from.buffer); + } + break; case TYPE_WRITE: + case TYPE_WRITE_TO: case TYPE_READINTO: if (self->user_buffer.obj) { PyBuffer_Release(&self->user_buffer); @@ -627,6 +652,31 @@ Overlapped_dealloc(OverlappedObject *self) SetLastError(olderr); } +static PyObject* +unparse_address(LPSOCKADDR Address, DWORD Length) { + // An IPv6 address has a maximum length of 39 characters + char AddressString[47]; + unsigned int port; + PVOID pSinAddr; + + switch(Address->sa_family) { + case AF_INET: + port = ntohs(((SOCKADDR_IN*)Address)->sin_port); + pSinAddr = &((SOCKADDR_IN*)Address)->sin_addr; + break; + case AF_INET6: + port = ntohs(((SOCKADDR_IN6*)Address)->sin6_port); + pSinAddr = &((SOCKADDR_IN6*)Address)->sin6_addr; + break; + default: + return SetFromWindowsErr(ERROR_INVALID_PARAMETER); + } + + inet_ntop(Address->sa_family, pSinAddr, AddressString, 47); + + return Py_BuildValue("(sH)", AddressString, port); +} + PyDoc_STRVAR( Overlapped_cancel_doc, "cancel() -> None\n\n" @@ -697,6 +747,8 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) case ERROR_BROKEN_PIPE: if (self->type == TYPE_READ || self->type == TYPE_READINTO) break; + else if(self->type == TYPE_READ_FROM && (self->read_from.result != NULL || self->read_from.buffer != NULL)) + break; /* fall through */ default: return SetFromWindowsErr(err); @@ -708,8 +760,28 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) if (transferred != PyBytes_GET_SIZE(self->allocated_buffer) && _PyBytes_Resize(&self->allocated_buffer, transferred)) return NULL; + Py_INCREF(self->allocated_buffer); return self->allocated_buffer; + case TYPE_READ_FROM: + PyBytes_CheckExact(self->read_from.buffer); + + if(transferred != PyBytes_GET_SIZE(self->read_from.buffer) && + _PyBytes_Resize(&self->read_from.buffer, transferred)) + return NULL; + + // The result is a two item tuple: (message, (address, port)) + self->read_from.result = PyTuple_New(2); + // first item: message + PyTuple_SetItem(self->read_from.result, 0, self->read_from.buffer); + // second item: tuple(address, port) + PyTuple_SetItem(self->read_from.result, 1, + unparse_address((SOCKADDR*)&self->read_from.address, + self->read_from.address_length)); + + Py_INCREF(self->read_from.result); + return self->read_from.result; +>>>>>>> bpo-29883: Add UDP to Windows Proactor Event Loop default: return PyLong_FromUnsignedLong((unsigned long) transferred); } @@ -1121,7 +1193,6 @@ parse_address(PyObject *obj, SOCKADDR *Address, int Length) return -1; } - PyDoc_STRVAR( Overlapped_ConnectEx_doc, "ConnectEx(client_handle, address_as_bytes) -> Overlapped[None]\n\n" @@ -1371,6 +1442,196 @@ Overlapped_traverse(OverlappedObject *self, visitproc visit, void *arg) return 0; } +// UDP functions + +PyDoc_STRVAR( + Overlapped_WSAConnect_doc, + "WSAConnect(client_handle, address_as_bytes) -> Overlapped[None]\n\n" + "Bind a remote address to a connectionless (UDP) socket"); + +/* + * Note: WSAConnect does not support Overlapped I/O so this function should + * _only_ be used for connection-less sockets (UDP). + */ +static PyObject * +Overlapped_WSAConnect(OverlappedObject *self, PyObject *args) { + SOCKET ConnectSocket; + PyObject *AddressObj; + char AddressBuf[sizeof(struct sockaddr_in6)]; + SOCKADDR *Address = (SOCKADDR*)AddressBuf; + int Length; + DWORD err; + + if(!PyArg_ParseTuple(args, F_HANDLE "O", &ConnectSocket, &AddressObj)) + return NULL; + + if(self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + Length = sizeof(AddressBuf); + Length = parse_address(AddressObj, Address, Length); + if(Length < 0) + return NULL; + + //self->type = TYPE_CONNECT; + //self->handle = (HANDLE)ConnectSocket; + + Py_BEGIN_ALLOW_THREADS + // WSAConnect does not support overlapped I/O so this call will + // successfully complete immediately. + err = WSAConnect(ConnectSocket, Address, Length, + NULL, NULL, NULL, NULL); + Py_END_ALLOW_THREADS + + err = err == ERROR_SUCCESS ? 0 : WSAGetLastError(); + + // Reset the operation type since WSAConnect finishes immediately + //self->type = TYPE_NOT_STARTED; + //self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError(); + + switch(err) { + case ERROR_SUCCESS: + Py_RETURN_NONE; + default: + return SetFromWindowsErr(WSAGetLastError()); + } +} + +PyDoc_STRVAR( + Overlapped_WSASendTo_doc, + "WSASendTo(handle, buf, flags, address_as_bytes) -> Overlapped[bytes_transferred]\n\n" + "Start overlapped sendto over a connection-less (UDP) socket"); + +static PyObject * +Overlapped_WSASendTo(OverlappedObject *self, PyObject *args) { + HANDLE handle; + PyObject *bufobj; + DWORD flags; + PyObject *AddressObj; + char AddressBuf[sizeof(struct sockaddr_in6)]; + SOCKADDR *Address = (SOCKADDR*)AddressBuf; + int AddressLength; + DWORD written; + WSABUF wsabuf; + int ret; + DWORD err; + + if(!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD "O", + &handle, &bufobj, &flags, &AddressObj)) + return NULL; + + // Parse the "to" address + AddressLength = sizeof(AddressBuf); + AddressLength = parse_address(AddressObj, Address, AddressLength); + if(AddressLength < 0) + return NULL; + + if(self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + if(!PyArg_Parse(bufobj, "y*", &self->user_buffer)) + return NULL; + +#if SIZEOF_SIZE_T > SIZEOF_LONG + if(self->user_buffer.len > (Py_ssize_t)ULONG_MAX) { + PyBuffer_Release(&self->user_buffer); + PyErr_SetString(PyExc_ValueError, "buffer to large"); + return NULL; + } +#endif + + self->type = TYPE_WRITE_TO; + self->handle = handle; + wsabuf.len = (DWORD)self->user_buffer.len; + wsabuf.buf = self->user_buffer.buf; + + Py_BEGIN_ALLOW_THREADS + ret = WSASendTo((SOCKET)handle, &wsabuf, 1, &written, flags, + Address, AddressLength, &self->overlapped, NULL); + Py_END_ALLOW_THREADS + + self->error = err = (ret == SOCKET_ERROR ? WSAGetLastError() : ERROR_SUCCESS); + + switch(err) { + case ERROR_SUCCESS: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); + } +} + + + +PyDoc_STRVAR( + Overlapped_WSARecvFrom_doc, + "RecvFile(handle, size, flags) -> Overlapped[(message, (host, port))]\n\n" + "Start overlapped receive"); + +static PyObject * +Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *args) { + HANDLE handle; + DWORD size; + DWORD flags = 0; + DWORD nread; + PyObject *buf; + WSABUF wsabuf; + int ret; + DWORD err; + + if(!PyArg_ParseTuple(args, F_HANDLE F_DWORD "|" F_DWORD, + &handle, &size, &flags)) + return NULL; + + if(self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + +#if SIZEOF_SIZE_T <= SIZEOF_LONG + size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX); +#endif + buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1)); + if(buf == NULL) + return NULL; + + wsabuf.len = size; + wsabuf.buf = PyBytes_AS_STRING(buf); + + self->type = TYPE_READ_FROM; + self->handle = handle; + self->read_from.buffer = buf; + memset(&self->read_from.address, 0, sizeof(self->read_from.address)); + self->read_from.address_length = sizeof(self->read_from.address); + + Py_BEGIN_ALLOW_THREADS + ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, + (SOCKADDR*)&self->read_from.address, + &self->read_from.address_length, + &self->overlapped, NULL); + Py_END_ALLOW_THREADS + + self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); + switch(err) { + case ERROR_BROKEN_PIPE: + mark_as_completed(&self->overlapped); + return SetFromWindowsErr(err); + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); + } +>>>>>>> bpo-29883: Add UDP to Windows Proactor Event Loop +} + static PyMethodDef Overlapped_methods[] = { {"getresult", (PyCFunction) Overlapped_getresult, @@ -1399,6 +1660,10 @@ static PyMethodDef Overlapped_methods[] = { METH_VARARGS, Overlapped_TransmitFile_doc}, {"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe, METH_VARARGS, Overlapped_ConnectNamedPipe_doc}, + { "WSARecvFrom", Overlapped_WSARecvFrom, + METH_VARARGS, Overlapped_WSARecvFrom_doc }, + { "WSASendTo", Overlapped_WSASendTo, + METH_VARARGS, Overlapped_WSASendTo_doc }, {NULL} }; @@ -1485,8 +1750,10 @@ static PyMethodDef overlapped_functions[] = { {"ResetEvent", overlapped_ResetEvent, METH_VARARGS, ResetEvent_doc}, {"ConnectPipe", - (PyCFunction) ConnectPipe, + (PyCFunction)ConnectPipe, METH_VARARGS, ConnectPipe_doc}, + {"WSAConnect", Overlapped_WSAConnect, + METH_VARARGS, Overlapped_WSAConnect_doc}, {NULL} }; From 1c72ea59f57e1e3706051ceaf2727337fbbbce6e Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Fri, 15 Dec 2017 22:55:54 -0500 Subject: [PATCH 02/42] fix AppVeyor issues --- Modules/overlapped.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 0ebac151c9fee9..74976d93ccb64f 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -781,7 +781,6 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) Py_INCREF(self->read_from.result); return self->read_from.result; ->>>>>>> bpo-29883: Add UDP to Windows Proactor Event Loop default: return PyLong_FromUnsignedLong((unsigned long) transferred); } @@ -1660,9 +1659,9 @@ static PyMethodDef Overlapped_methods[] = { METH_VARARGS, Overlapped_TransmitFile_doc}, {"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe, METH_VARARGS, Overlapped_ConnectNamedPipe_doc}, - { "WSARecvFrom", Overlapped_WSARecvFrom, + {"WSARecvFrom", (PyCFunction) Overlapped_WSARecvFrom, METH_VARARGS, Overlapped_WSARecvFrom_doc }, - { "WSASendTo", Overlapped_WSASendTo, + {"WSASendTo", (PyCFunction) Overlapped_WSASendTo, METH_VARARGS, Overlapped_WSASendTo_doc }, {NULL} }; From 668d6d7a1ff28a8d66427718234decd2bdf91019 Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Sat, 16 Dec 2017 11:44:55 -0500 Subject: [PATCH 03/42] fixed UDP proactor tests --- Lib/test/test_asyncio/test_proactor_events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index b7626810695b8b..6b95fae3f599a8 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -978,7 +978,7 @@ def test_sock_sendfile_not_regular_file(self): self.assertEqual(self.file.tell(), 0) def datagram_transport(self): - self.protocol = make_test_protocol(asyncio.DatagramProtocol) + self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) return self.loop._make_datagram_transport(self.sock, self.protocol) def test_make_datagram_transport(self): @@ -1023,7 +1023,7 @@ def test_datagram_loop_reading_no_data(self): tr.close = mock.Mock() tr._read_fut = res tr._loop_reading(res) - self.assertFalse(self.loop._proactor.recvfrom.called) + self.assertTrue(self.loop._proactor.recvfrom.called) self.assertFalse(self.protocol.error_received.called) self.assertFalse(tr.close.called) @@ -1045,9 +1045,9 @@ def test_datagram_loop_writing_aborted(self): tr = self.datagram_transport() tr._fatal_error = mock.Mock() tr._protocol.error_received = mock.Mock() - tr._protocol.error_received.assert_called_with(err) tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068))) tr._loop_writing() + tr._protocol.error_received.assert_called_with(err) tr._fatal_error.assert_called_with( err, 'Fatal error sending UDP datagram') From 05a704c6e9c22936939acddc032755746a2c31c9 Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Sat, 16 Dec 2017 11:45:15 -0500 Subject: [PATCH 04/42] add missing cast to PyCFunction --- Modules/overlapped.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 74976d93ccb64f..65128441e7acad 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -1751,7 +1751,7 @@ static PyMethodDef overlapped_functions[] = { {"ConnectPipe", (PyCFunction)ConnectPipe, METH_VARARGS, ConnectPipe_doc}, - {"WSAConnect", Overlapped_WSAConnect, + {"WSAConnect", (PyCFunction) Overlapped_WSAConnect, METH_VARARGS, Overlapped_WSAConnect_doc}, {NULL} }; From 685ed0f145b063d86285de00be68603f9d114407 Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Sat, 16 Dec 2017 12:14:42 -0500 Subject: [PATCH 05/42] implement extra data for proactor UDP transport --- Lib/asyncio/proactor_events.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index a8d8c231b47a57..957224a057fcf4 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -439,6 +439,26 @@ def __init__(self, loop, sock, protocol, address=None, self._buffer = collections.deque() self._loop.call_soon(self._loop_reading) + def _set_extra(self, sock): + super(_ProactorDatagramTransport, self)._set_extra(sock) + self._extra['socket'] = sock + + try: + self._extra['sockname'] = sock.getsockname() + except (socket.error, AttributeError): + if self._loop.get_debug(): + logger.warning( + "getsockname() failed on %r", sock, exc_info=True) + + if 'peername' not in self._extra: + try: + self._extra['peername'] = sock.getpeername() + except (socket.error, AttributeError): + if self._loop.get_debug(): + logger.warning("getpeername() failed on %r", + sock, exc_info=True) + + def abort(self): self._force_close(None) From 038739e7e887e088bd95ed3e62f89aaa89ce4ce1 Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Sat, 16 Dec 2017 12:17:54 -0500 Subject: [PATCH 06/42] enable UDP proactor unit tests on Windows --- Lib/test/test_asyncio/test_events.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index b46b614e556eae..76bed59eb2c4c4 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1252,11 +1252,6 @@ def datagram_received(self, data, addr): server.transport.close() def test_create_datagram_endpoint_sock(self): - if (sys.platform == 'win32' and - isinstance(self.loop, proactor_events.BaseProactorEventLoop)): - raise unittest.SkipTest( - 'UDP is not supported with proactor event loops') - sock = None local_address = ('127.0.0.1', 0) infos = self.loop.run_until_complete( @@ -2009,10 +2004,6 @@ def test_writer_callback(self): def test_writer_callback_cancel(self): raise unittest.SkipTest("IocpEventLoop does not have add_writer()") - def test_create_datagram_endpoint(self): - raise unittest.SkipTest( - "IocpEventLoop does not have create_datagram_endpoint()") - def test_remove_fds_after_closing(self): raise unittest.SkipTest("IocpEventLoop does not have add_reader()") else: From ae141186c85e436c62f7098a815d3b15b4ab4670 Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Sat, 15 Sep 2018 11:28:17 -0400 Subject: [PATCH 07/42] fix bad rebase of datagram proactor tests --- Lib/test/test_asyncio/test_proactor_events.py | 155 +++++++++--------- 1 file changed, 80 insertions(+), 75 deletions(-) diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 6b95fae3f599a8..9684aa98d4ef7e 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -865,6 +865,86 @@ def test_stop_serving(self): self.assertFalse(sock2.close.called) self.assertFalse(future2.cancel.called) + def datagram_transport(self): + self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) + return self.loop._make_datagram_transport(self.sock, self.protocol) + + def test_make_datagram_transport(self): + tr = self.datagram_transport() + self.assertIsInstance(tr, _ProactorDatagramTransport) + close_transport(tr) + + def test_datagram_loop_writing(self): + tr = self.datagram_transport() + tr._buffer.appendleft((b'data', ('127.0.0.1', 12068))) + tr._loop_writing() + self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068)) + self.loop._proactor.sendto.return_value.add_done_callback.\ + assert_called_with(tr._loop_writing) + + close_transport(tr) + + def test_datagram_loop_reading(self): + tr = self.datagram_transport() + tr._loop_reading() + self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096) + self.assertFalse(self.protocol.datagram_received.called) + self.assertFalse(self.protocol.error_received.called) + close_transport(tr) + + def test_datagram_loop_reading_data(self): + res = asyncio.Future(loop=self.loop) + res.set_result((b'data', ('127.0.0.1', 12068))) + + tr = self.datagram_transport() + tr._read_fut = res + tr._loop_reading(res) + self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096) + self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068)) + close_transport(tr) + + def test_datagram_loop_reading_no_data(self): + res = asyncio.Future(loop=self.loop) + res.set_result((b'', ('127.0.0.1', 12068))) + + tr = self.datagram_transport() + self.assertRaises(AssertionError, tr._loop_reading, res) + + tr.close = mock.Mock() + tr._read_fut = res + tr._loop_reading(res) + self.assertTrue(self.loop._proactor.recvfrom.called) + self.assertFalse(self.protocol.error_received.called) + self.assertFalse(tr.close.called) + close_transport(tr) + + def test_datagram_loop_reading_aborted(self): + err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError() + + tr = self.datagram_transport() + tr._fatal_error = mock.Mock() + tr._protocol.error_received = mock.Mock() + tr._loop_reading() + tr._protocol.error_received.assert_called_with(err) + tr._fatal_error.assert_called_with( + err, + 'Fatal error reading from UDP endpoint') + close_transport(tr) + + def test_datagram_loop_writing_aborted(self): + err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError() + + tr = self.datagram_transport() + tr._fatal_error = mock.Mock() + tr._protocol.error_received = mock.Mock() + tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068))) + tr._loop_writing() + tr._protocol.error_received.assert_called_with(err) + tr._fatal_error.assert_called_with( + err, + 'Fatal error sending UDP datagram') + close_transport(tr) + @unittest.skipIf(sys.platform != 'win32', 'Proactor is supported on Windows only') @@ -977,81 +1057,6 @@ def test_sock_sendfile_not_regular_file(self): 0, None)) self.assertEqual(self.file.tell(), 0) - def datagram_transport(self): - self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) - return self.loop._make_datagram_transport(self.sock, self.protocol) - - def test_make_datagram_transport(self): - tr = self.datagram_transport() - self.assertIsInstance(tr, _ProactorDatagramTransport) - close_transport(tr) - - def test_datagram_loop_writing(self): - tr = self.datagram_transport() - tr._buffer.appendleft((b'data', ('127.0.0.1', 12068))) - tr._loop_writing() - self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068)) - self.loop._proactor.sendto.return_value.add_done_callback.\ - assert_called_with(tr._loop_writing) - - close_transport(tr) - - def test_datagram_loop_reading(self): - tr = self.datagram_transport() - tr._loop_reading() - self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096) - self.assertFalse(self.protocol.datagram_received.called) - self.assertFalse(self.protocol.error_received.called) - - def test_datagram_loop_reading_data(self): - res = asyncio.Future(loop=self.loop) - res.set_result((b'data', ('127.0.0.1', 12068))) - - tr = self.datagram_transport() - tr._read_fut = res - tr._loop_reading(res) - self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096) - self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068)) - - def test_datagram_loop_reading_no_data(self): - res = asyncio.Future(loop=self.loop) - res.set_result((b'', ('127.0.0.1', 12068))) - - tr = self.datagram_transport() - self.assertRaises(AssertionError, tr._loop_reading, res) - - tr.close = mock.Mock() - tr._read_fut = res - tr._loop_reading(res) - self.assertTrue(self.loop._proactor.recvfrom.called) - self.assertFalse(self.protocol.error_received.called) - self.assertFalse(tr.close.called) - - def test_datagram_loop_reading_aborted(self): - err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError() - - tr = self.datagram_transport() - tr._fatal_error = mock.Mock() - tr._protocol.error_received = mock.Mock() - tr._loop_reading() - tr._protocol.error_received.assert_called_with(err) - tr._fatal_error.assert_called_with( - err, - 'Fatal error reading from UDP endpoint') - - def test_datagram_loop_writing_aborted(self): - err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError() - - tr = self.datagram_transport() - tr._fatal_error = mock.Mock() - tr._protocol.error_received = mock.Mock() - tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068))) - tr._loop_writing() - tr._protocol.error_received.assert_called_with(err) - tr._fatal_error.assert_called_with( - err, - 'Fatal error sending UDP datagram') - if __name__ == '__main__': unittest.main() From 93ce3f8b21f80194e334600096d92e648c7f4dd5 Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Sat, 15 Sep 2018 11:37:41 -0400 Subject: [PATCH 08/42] add news item for bpo29883 --- .../next/Windows/2018-09-15-11-36-55.bpo-29883.HErerE.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 Misc/NEWS.d/next/Windows/2018-09-15-11-36-55.bpo-29883.HErerE.rst diff --git a/Misc/NEWS.d/next/Windows/2018-09-15-11-36-55.bpo-29883.HErerE.rst b/Misc/NEWS.d/next/Windows/2018-09-15-11-36-55.bpo-29883.HErerE.rst new file mode 100644 index 00000000000000..b6d1375c775263 --- /dev/null +++ b/Misc/NEWS.d/next/Windows/2018-09-15-11-36-55.bpo-29883.HErerE.rst @@ -0,0 +1,2 @@ +Add Windows support for UDP transports for the Proactor Event Loop. Patch by +Adam Meily. From 781e2efeaa110e7ffde44f33e233c0553fd9c9bd Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Sat, 15 Sep 2018 12:55:56 -0400 Subject: [PATCH 09/42] fix bad rebase with missing finish_recv() in recv_into() --- Lib/asyncio/windows_events.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index c94147991a7480..70826120a4bf95 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -469,6 +469,18 @@ def recv_into(self, conn, buf, flags=0): except BrokenPipeError: return self._result(b'') + def finish_recv(trans, key, ov): + try: + return ov.getresult() + except OSError as exc: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): + raise ConnectionResetError(*exc.args) + else: + raise + + return self._register(ov, conn, finish_recv) + def recvfrom(self, conn, nbytes, flags=0): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) From 8761fcdb5e60c255f90f56b6149fe8d9a443d84d Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Sat, 15 Sep 2018 13:40:59 -0400 Subject: [PATCH 10/42] change WSAConnect to a module method --- Modules/overlapped.c | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 65128441e7acad..0d7a9e5d1b48fa 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -1444,7 +1444,7 @@ Overlapped_traverse(OverlappedObject *self, visitproc visit, void *arg) // UDP functions PyDoc_STRVAR( - Overlapped_WSAConnect_doc, + overlapped_WSAConnect_doc, "WSAConnect(client_handle, address_as_bytes) -> Overlapped[None]\n\n" "Bind a remote address to a connectionless (UDP) socket"); @@ -1453,7 +1453,7 @@ PyDoc_STRVAR( * _only_ be used for connection-less sockets (UDP). */ static PyObject * -Overlapped_WSAConnect(OverlappedObject *self, PyObject *args) { +overlapped_WSAConnect(PyObject *self, PyObject *args) { SOCKET ConnectSocket; PyObject *AddressObj; char AddressBuf[sizeof(struct sockaddr_in6)]; @@ -1464,19 +1464,11 @@ Overlapped_WSAConnect(OverlappedObject *self, PyObject *args) { if(!PyArg_ParseTuple(args, F_HANDLE "O", &ConnectSocket, &AddressObj)) return NULL; - if(self->type != TYPE_NONE) { - PyErr_SetString(PyExc_ValueError, "operation already attempted"); - return NULL; - } - Length = sizeof(AddressBuf); Length = parse_address(AddressObj, Address, Length); if(Length < 0) return NULL; - //self->type = TYPE_CONNECT; - //self->handle = (HANDLE)ConnectSocket; - Py_BEGIN_ALLOW_THREADS // WSAConnect does not support overlapped I/O so this call will // successfully complete immediately. @@ -1486,10 +1478,6 @@ Overlapped_WSAConnect(OverlappedObject *self, PyObject *args) { err = err == ERROR_SUCCESS ? 0 : WSAGetLastError(); - // Reset the operation type since WSAConnect finishes immediately - //self->type = TYPE_NOT_STARTED; - //self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError(); - switch(err) { case ERROR_SUCCESS: Py_RETURN_NONE; @@ -1751,8 +1739,8 @@ static PyMethodDef overlapped_functions[] = { {"ConnectPipe", (PyCFunction)ConnectPipe, METH_VARARGS, ConnectPipe_doc}, - {"WSAConnect", (PyCFunction) Overlapped_WSAConnect, - METH_VARARGS, Overlapped_WSAConnect_doc}, + {"WSAConnect", (PyCFunction) overlapped_WSAConnect, + METH_VARARGS, overlapped_WSAConnect_doc}, {NULL} }; From 5e3d4d32a527b4727c8d26cf388e2b7690fb046f Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Sat, 22 Sep 2018 10:46:13 -0400 Subject: [PATCH 11/42] PEP 7 fixes and address PR feedback --- Modules/overlapped.c | 96 ++++++++++++++++++++++++++++---------------- 1 file changed, 61 insertions(+), 35 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 0d7a9e5d1b48fa..575aae541526fe 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -39,7 +39,8 @@ enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_READINTO, TYPE_WRITE, TYPE_ACCEPT, TYPE_CONNECT, TYPE_DISCONNECT, TYPE_CONNECT_NAMED_PIPE, - TYPE_WAIT_NAMED_PIPE_AND_CONNECT, TYPE_TRANSMIT_FILE, TYPE_READ_FROM, TYPE_WRITE_TO}; + TYPE_WAIT_NAMED_PIPE_AND_CONNECT, TYPE_TRANSMIT_FILE, TYPE_READ_FROM, + TYPE_WRITE_TO}; typedef struct { PyObject_HEAD @@ -56,7 +57,8 @@ typedef struct { /* Buffer passed by the user: TYPE_WRITE, TYPE_WRITE_TO, and TYPE_READINTO */ Py_buffer user_buffer; - /* Data used for reading from a connection-less socket: TYPE_READ_FROM */ + /* Data used for reading from a connection-less socket: + TYPE_READ_FROM */ struct { // A (buffer, (host, port)) tuple PyObject *result; @@ -720,6 +722,7 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) DWORD transferred = 0; BOOL ret; DWORD err; + PyObject *addr; if (!PyArg_ParseTuple(args, "|" F_BOOL, &wait)) return NULL; @@ -745,10 +748,15 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) case ERROR_MORE_DATA: break; case ERROR_BROKEN_PIPE: - if (self->type == TYPE_READ || self->type == TYPE_READINTO) + if (self->type == TYPE_READ || self->type == TYPE_READINTO) { break; - else if(self->type == TYPE_READ_FROM && (self->read_from.result != NULL || self->read_from.buffer != NULL)) + } + else if (self->type == TYPE_READ_FROM && + (self->read_from.result != NULL || + self->read_from.buffer != NULL)) + { break; + } /* fall through */ default: return SetFromWindowsErr(err); @@ -764,22 +772,28 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) Py_INCREF(self->allocated_buffer); return self->allocated_buffer; case TYPE_READ_FROM: - PyBytes_CheckExact(self->read_from.buffer); + assert(PyBytes_CheckExact(self->read_from.buffer)); - if(transferred != PyBytes_GET_SIZE(self->read_from.buffer) && - _PyBytes_Resize(&self->read_from.buffer, transferred)) + if (transferred != PyBytes_GET_SIZE(self->read_from.buffer) && + _PyBytes_Resize(&self->read_from.buffer, transferred)) + { return NULL; + } + + // unparse the address + addr = unparse_address((SOCKADDR*)&self->read_from.address, + self->read_from.address_length); // The result is a two item tuple: (message, (address, port)) self->read_from.result = PyTuple_New(2); // first item: message PyTuple_SetItem(self->read_from.result, 0, self->read_from.buffer); // second item: tuple(address, port) - PyTuple_SetItem(self->read_from.result, 1, - unparse_address((SOCKADDR*)&self->read_from.address, - self->read_from.address_length)); + PyTuple_SetItem(self->read_from.result, 1, addr); Py_INCREF(self->read_from.result); + Py_INCREF(self->read_from.addr); + Py_INCREF(self->read_from.buffer); return self->read_from.result; default: return PyLong_FromUnsignedLong((unsigned long) transferred); @@ -1461,19 +1475,21 @@ overlapped_WSAConnect(PyObject *self, PyObject *args) { int Length; DWORD err; - if(!PyArg_ParseTuple(args, F_HANDLE "O", &ConnectSocket, &AddressObj)) + if (!PyArg_ParseTuple(args, F_HANDLE "O", &ConnectSocket, &AddressObj)) { return NULL; + } Length = sizeof(AddressBuf); Length = parse_address(AddressObj, Address, Length); - if(Length < 0) + if (Length < 0) { return NULL; + } Py_BEGIN_ALLOW_THREADS - // WSAConnect does not support overlapped I/O so this call will - // successfully complete immediately. - err = WSAConnect(ConnectSocket, Address, Length, - NULL, NULL, NULL, NULL); + // WSAConnect does not support overlapped I/O so this call will + // successfully complete immediately. + err = WSAConnect(ConnectSocket, Address, Length, + NULL, NULL, NULL, NULL); Py_END_ALLOW_THREADS err = err == ERROR_SUCCESS ? 0 : WSAGetLastError(); @@ -1488,7 +1504,8 @@ overlapped_WSAConnect(PyObject *self, PyObject *args) { PyDoc_STRVAR( Overlapped_WSASendTo_doc, - "WSASendTo(handle, buf, flags, address_as_bytes) -> Overlapped[bytes_transferred]\n\n" + "WSASendTo(handle, buf, flags, address_as_bytes) -> " + "Overlapped[bytes_transferred]\n\n" "Start overlapped sendto over a connection-less (UDP) socket"); static PyObject * @@ -1505,26 +1522,30 @@ Overlapped_WSASendTo(OverlappedObject *self, PyObject *args) { int ret; DWORD err; - if(!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD "O", - &handle, &bufobj, &flags, &AddressObj)) + if (!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD "O", + &handle, &bufobj, &flags, &AddressObj)) + { return NULL; + } // Parse the "to" address AddressLength = sizeof(AddressBuf); AddressLength = parse_address(AddressObj, Address, AddressLength); - if(AddressLength < 0) + if (AddressLength < 0) { return NULL; + } - if(self->type != TYPE_NONE) { + if (self->type != TYPE_NONE) { PyErr_SetString(PyExc_ValueError, "operation already attempted"); return NULL; } - if(!PyArg_Parse(bufobj, "y*", &self->user_buffer)) + if (!PyArg_Parse(bufobj, "y*", &self->user_buffer)) { return NULL; + } #if SIZEOF_SIZE_T > SIZEOF_LONG - if(self->user_buffer.len > (Py_ssize_t)ULONG_MAX) { + if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) { PyBuffer_Release(&self->user_buffer); PyErr_SetString(PyExc_ValueError, "buffer to large"); return NULL; @@ -1537,11 +1558,12 @@ Overlapped_WSASendTo(OverlappedObject *self, PyObject *args) { wsabuf.buf = self->user_buffer.buf; Py_BEGIN_ALLOW_THREADS - ret = WSASendTo((SOCKET)handle, &wsabuf, 1, &written, flags, - Address, AddressLength, &self->overlapped, NULL); + ret = WSASendTo((SOCKET)handle, &wsabuf, 1, &written, flags, + Address, AddressLength, &self->overlapped, NULL); Py_END_ALLOW_THREADS - self->error = err = (ret == SOCKET_ERROR ? WSAGetLastError() : ERROR_SUCCESS); + self->error = err = (ret == SOCKET_ERROR ? WSAGetLastError() : + ERROR_SUCCESS); switch(err) { case ERROR_SUCCESS: @@ -1571,11 +1593,13 @@ Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *args) { int ret; DWORD err; - if(!PyArg_ParseTuple(args, F_HANDLE F_DWORD "|" F_DWORD, - &handle, &size, &flags)) + if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD "|" F_DWORD, + &handle, &size, &flags)) + { return NULL; + } - if(self->type != TYPE_NONE) { + if (self->type != TYPE_NONE) { PyErr_SetString(PyExc_ValueError, "operation already attempted"); return NULL; } @@ -1584,8 +1608,9 @@ Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *args) { size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX); #endif buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1)); - if(buf == NULL) + if (buf == NULL) { return NULL; + } wsabuf.len = size; wsabuf.buf = PyBytes_AS_STRING(buf); @@ -1597,13 +1622,14 @@ Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *args) { self->read_from.address_length = sizeof(self->read_from.address); Py_BEGIN_ALLOW_THREADS - ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, - (SOCKADDR*)&self->read_from.address, - &self->read_from.address_length, - &self->overlapped, NULL); + ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, + (SOCKADDR*)&self->read_from.address, + &self->read_from.address_length, + &self->overlapped, NULL); Py_END_ALLOW_THREADS - self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); + self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); + switch(err) { case ERROR_BROKEN_PIPE: mark_as_completed(&self->overlapped); From d25833dc38c883ffea86da532a679733f0ae446c Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Sat, 22 Sep 2018 11:04:45 -0400 Subject: [PATCH 12/42] fix build error --- Modules/overlapped.c | 1 - 1 file changed, 1 deletion(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 575aae541526fe..9662ceeb5dcc20 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -792,7 +792,6 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) PyTuple_SetItem(self->read_from.result, 1, addr); Py_INCREF(self->read_from.result); - Py_INCREF(self->read_from.addr); Py_INCREF(self->read_from.buffer); return self->read_from.result; default: From 16d2ce76ca06134b4cba1b12b465a83bae8a7bf1 Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Wed, 26 Sep 2018 17:32:27 -0400 Subject: [PATCH 13/42] simplify calls to super() --- Lib/asyncio/proactor_events.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 957224a057fcf4..197552b9a32ba2 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -430,9 +430,7 @@ class _ProactorDatagramTransport(_ProactorBasePipeTransport): def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): - super(_ProactorDatagramTransport, self).__init__(loop, sock, protocol, - waiter=waiter, - extra=extra) + super().__init__(loop, sock, protocol, waiter=waiter, extra=extra) self._address = address # We don't need to call _protocol.connection_made() since our base # constructor does it for us. @@ -440,7 +438,7 @@ def __init__(self, loop, sock, protocol, address=None, self._loop.call_soon(self._loop_reading) def _set_extra(self, sock): - super(_ProactorDatagramTransport, self)._set_extra(sock) + super()._set_extra(sock) self._extra['socket'] = sock try: From da40f283a0cf6293aa6ee53fbe217f1f8193e2fc Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Tue, 2 Oct 2018 23:39:59 -0400 Subject: [PATCH 14/42] address PR feedback --- Doc/library/asyncio-platforms.rst | 3 - Lib/asyncio/proactor_events.py | 145 +++++++++--------- Lib/asyncio/windows_events.py | 4 +- Lib/test/test_asyncio/test_proactor_events.py | 6 - 4 files changed, 77 insertions(+), 81 deletions(-) diff --git a/Doc/library/asyncio-platforms.rst b/Doc/library/asyncio-platforms.rst index 81d840e23277ea..7e4a70f91c6ed5 100644 --- a/Doc/library/asyncio-platforms.rst +++ b/Doc/library/asyncio-platforms.rst @@ -53,9 +53,6 @@ All event loops on Windows do not support the following methods: :class:`ProactorEventLoop` has the following limitations: -* The :meth:`loop.create_datagram_endpoint` method - is not supported. - * The :meth:`loop.add_reader` and :meth:`loop.add_writer` methods are not supported. diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 197552b9a32ba2..5ac4ec5ba84344 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -23,6 +23,25 @@ from .log import logger +def _set_socket_extra(transport, sock): + transport._extra['socket'] = sock + + try: + transport._extra['sockname'] = sock.getsockname() + except (socket.error, AttributeError): + if transport._loop.get_debug(): + logger.warning( + "getsockname() failed on %r", sock, exc_info=True) + + if 'peername' not in transport._extra: + try: + transport._extra['peername'] = sock.getpeername() + except (socket.error, AttributeError): + if transport._loop.get_debug(): + logger.warning("getpeername() failed on %r", + sock, exc_info=True) + + class _ProactorBasePipeTransport(transports._FlowControlMixin, transports.BaseTransport): """Base class for pipe and socket transports.""" @@ -430,32 +449,18 @@ class _ProactorDatagramTransport(_ProactorBasePipeTransport): def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): - super().__init__(loop, sock, protocol, waiter=waiter, extra=extra) self._address = address + self._empty_waiter = None # We don't need to call _protocol.connection_made() since our base # constructor does it for us. + super().__init__(loop, sock, protocol, waiter=waiter, extra=extra) + + # The base constructor sets _buffer = None, so we set it here self._buffer = collections.deque() self._loop.call_soon(self._loop_reading) def _set_extra(self, sock): - super()._set_extra(sock) - self._extra['socket'] = sock - - try: - self._extra['sockname'] = sock.getsockname() - except (socket.error, AttributeError): - if self._loop.get_debug(): - logger.warning( - "getsockname() failed on %r", sock, exc_info=True) - - if 'peername' not in self._extra: - try: - self._extra['peername'] = sock.getpeername() - except (socket.error, AttributeError): - if self._loop.get_debug(): - logger.warning("getpeername() failed on %r", - sock, exc_info=True) - + _set_socket_extra(self, sock) def abort(self): self._force_close(None) @@ -468,8 +473,14 @@ def sendto(self, data, addr=None): if not data: return + if self._address and addr not in (None, self._address): + raise ValueError( + f'Invalid address: must be None or {self._address}') + if self._conn_lost and self._address: - # close() or force_close() has been called on the bound endpoint + if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: + logger.warning('socket.sendto() raised exception.') + self._conn_lost += 1 return self._buffer.appendleft((data, addr)) @@ -477,67 +488,77 @@ def sendto(self, data, addr=None): if self._write_fut is None: # No current write operations are active, kick one off self._loop_writing() - else: - # A write operation is already kicked off - pass + # else: A write operation is already kicked off def _loop_writing(self, fut=None): - if self._conn_lost: - return - - assert fut is self._write_fut - if fut: - # We are in a _loop_writing() done callback, get the result - fut.result() + try: + if self._conn_lost: + return - if not self._buffer or (self._conn_lost and self._address): - # The connection has been closed + assert fut is self._write_fut self._write_fut = None - return + if fut: + # We are in a _loop_writing() done callback, get the result + fut.result() - data, addr = self._buffer.pop() + if not self._buffer or (self._conn_lost and self._address): + # The connection has been closed + if self._closing: + self._loop.call_soon(self._call_connection_lost, None) + return - self._write_fut = None - try: + data, addr = self._buffer.pop() if self._address: self._write_fut = self._loop._proactor.send(self._sock, data) else: self._write_fut = self._loop._proactor.sendto(self._sock, data, addr=addr) except OSError as exc: self._protocol.error_received(exc) - self._fatal_error(exc, 'Fatal error sending UDP datagram') + except Exception as exc: + self._fatal_error(exc, 'Fatal write error on datagram transport') else: self._write_fut.add_done_callback(self._loop_writing) def _loop_reading(self, fut=None): - if self._conn_lost: - return - - assert self._read_fut is fut + data = None + try: + if self._conn_lost: + return - if fut: - res = fut.result() + assert self._read_fut is fut or (self._read_fut is None and + self._closing) - if self._address: - data, addr = res, self._address - else: - data, addr = res + self._read_fut = None + if fut is not None: + res = fut.result() - self._protocol.datagram_received(data, addr) + if self._closing: + # since close() has been called we ignore any read data + data = None + return - if self._conn_lost: - return + if self._address: + data, addr = res, self._address + else: + data, addr = res - try: + if self._conn_lost: + return if self._address: self._read_fut = self._loop._proactor.recv(self._sock, 4096) else: self._read_fut = self._loop._proactor.recvfrom(self._sock, 4096) except OSError as exc: self._protocol.error_received(exc) - self._fatal_error(exc, "Fatal error reading from UDP endpoint") + except exceptions.CancelledError: + if not self._closing: + raise else: - self._read_fut.add_done_callback(self._loop_reading) + if self._read_fut is not None: + self._read_fut.add_done_callback(self._loop_reading) + finally: + if data: + self._protocol.datagram_received(data, addr) class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport, @@ -565,22 +586,7 @@ def __init__(self, loop, sock, protocol, waiter=None, base_events._set_nodelay(sock) def _set_extra(self, sock): - self._extra['socket'] = sock - - try: - self._extra['sockname'] = sock.getsockname() - except (socket.error, AttributeError): - if self._loop.get_debug(): - logger.warning( - "getsockname() failed on %r", sock, exc_info=True) - - if 'peername' not in self._extra: - try: - self._extra['peername'] = sock.getpeername() - except (socket.error, AttributeError): - if self._loop.get_debug(): - logger.warning("getpeername() failed on %r", - sock, exc_info=True) + _set_socket_extra(self, sock) def can_write_eof(self): return True @@ -621,7 +627,6 @@ def _make_ssl_transport( self, protocol, sslcontext, waiter, server_side, server_hostname, ssl_handshake_timeout=ssl_handshake_timeout) - _ProactorSocketTransport(self, rawsock, ssl_protocol, extra=extra, server=server) return ssl_protocol._app_transport diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 70826120a4bf95..0ece7bd80fab60 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -511,7 +511,8 @@ def finish_send(trans, key, ov): try: return ov.getresult() except OSError as exc: - if exc.winerror == _overlapped.ERROR_NETNAME_DELETED: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): raise ConnectionResetError(*exc.args) else: raise @@ -572,7 +573,6 @@ def connect(self, conn, address): # need to register any IOCP operation _overlapped.WSAConnect(conn.fileno(), address) fut = self._loop.create_future() - fut.set_result(None) return fut diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 9684aa98d4ef7e..e094d080770c59 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -926,9 +926,6 @@ def test_datagram_loop_reading_aborted(self): tr._protocol.error_received = mock.Mock() tr._loop_reading() tr._protocol.error_received.assert_called_with(err) - tr._fatal_error.assert_called_with( - err, - 'Fatal error reading from UDP endpoint') close_transport(tr) def test_datagram_loop_writing_aborted(self): @@ -940,9 +937,6 @@ def test_datagram_loop_writing_aborted(self): tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068))) tr._loop_writing() tr._protocol.error_received.assert_called_with(err) - tr._fatal_error.assert_called_with( - err, - 'Fatal error sending UDP datagram') close_transport(tr) From 2962f5cf846dc11c731e1de30bcb1598cd2934b9 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Thu, 8 Nov 2018 14:43:24 +0200 Subject: [PATCH 15/42] Update asyncio-eventloop.rst --- Doc/library/asyncio-eventloop.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 571bc1db36da98..15f9706827000d 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -489,7 +489,7 @@ Opening network connections The *family*, *proto*, *flags*, *reuse_address*, *reuse_port, *allow_broadcast*, and *sock* parameters were added. - .. versionchanged:: 3.8.0 + .. versionchanged:: 3.8 Added support for Windows. .. coroutinemethod:: loop.create_unix_connection(protocol_factory, \ From ce73a323827cae629418e3ba2aaa3320355752ec Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Fri, 28 Dec 2018 09:12:51 -0500 Subject: [PATCH 16/42] address feedback from PR --- Lib/asyncio/proactor_events.py | 5 ++-- Modules/overlapped.c | 53 ++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 5ac4ec5ba84344..47399e43ae18d4 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -37,9 +37,8 @@ def _set_socket_extra(transport, sock): try: transport._extra['peername'] = sock.getpeername() except (socket.error, AttributeError): - if transport._loop.get_debug(): - logger.warning("getpeername() failed on %r", - sock, exc_info=True) + # UDP sockets may not have a peer name + transport._extra['peername'] = None class _ProactorBasePipeTransport(transports._FlowControlMixin, diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 9662ceeb5dcc20..82b4ad7b0df71d 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -57,7 +57,7 @@ typedef struct { /* Buffer passed by the user: TYPE_WRITE, TYPE_WRITE_TO, and TYPE_READINTO */ Py_buffer user_buffer; - /* Data used for reading from a connection-less socket: + /* Data used for reading from a connectionless socket: TYPE_READ_FROM */ struct { // A (buffer, (host, port)) tuple @@ -655,11 +655,13 @@ Overlapped_dealloc(OverlappedObject *self) } static PyObject* -unparse_address(LPSOCKADDR Address, DWORD Length) { +unparse_address(LPSOCKADDR Address, DWORD Length) +{ // An IPv6 address has a maximum length of 39 characters - char AddressString[47]; + char AddressString[40]; unsigned int port; PVOID pSinAddr; + DWORD err; switch(Address->sa_family) { case AF_INET: @@ -674,7 +676,11 @@ unparse_address(LPSOCKADDR Address, DWORD Length) { return SetFromWindowsErr(ERROR_INVALID_PARAMETER); } - inet_ntop(Address->sa_family, pSinAddr, AddressString, 47); + if (!inet_ntop(Address->sa_family, pSinAddr, AddressString, + sizeof(AddressString))) + { + return SetFromWindowsErr(WSAGetLastError()); + } return Py_BuildValue("(sH)", AddressString, port); } @@ -786,10 +792,15 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) // The result is a two item tuple: (message, (address, port)) self->read_from.result = PyTuple_New(2); + if (self->read_from.result == NULL) { + return NULL; + } + // first item: message - PyTuple_SetItem(self->read_from.result, 0, self->read_from.buffer); + PyTuple_SET_ITEM(self->read_from.result, 0, + self->read_from.buffer); // second item: tuple(address, port) - PyTuple_SetItem(self->read_from.result, 1, addr); + PyTuple_SET_ITEM(self->read_from.result, 1, addr); Py_INCREF(self->read_from.result); Py_INCREF(self->read_from.buffer); @@ -1463,16 +1474,17 @@ PyDoc_STRVAR( /* * Note: WSAConnect does not support Overlapped I/O so this function should - * _only_ be used for connection-less sockets (UDP). + * _only_ be used for connectionless sockets (UDP). */ static PyObject * -overlapped_WSAConnect(PyObject *self, PyObject *args) { +overlapped_WSAConnect(PyObject *self, PyObject *args) +{ SOCKET ConnectSocket; PyObject *AddressObj; char AddressBuf[sizeof(struct sockaddr_in6)]; SOCKADDR *Address = (SOCKADDR*)AddressBuf; int Length; - DWORD err; + int err; if (!PyArg_ParseTuple(args, F_HANDLE "O", &ConnectSocket, &AddressObj)) { return NULL; @@ -1491,12 +1503,10 @@ overlapped_WSAConnect(PyObject *self, PyObject *args) { NULL, NULL, NULL, NULL); Py_END_ALLOW_THREADS - err = err == ERROR_SUCCESS ? 0 : WSAGetLastError(); - - switch(err) { - case ERROR_SUCCESS: + if (err == 0) { Py_RETURN_NONE; - default: + } + else { return SetFromWindowsErr(WSAGetLastError()); } } @@ -1505,10 +1515,11 @@ PyDoc_STRVAR( Overlapped_WSASendTo_doc, "WSASendTo(handle, buf, flags, address_as_bytes) -> " "Overlapped[bytes_transferred]\n\n" - "Start overlapped sendto over a connection-less (UDP) socket"); + "Start overlapped sendto over a connectionless (UDP) socket"); static PyObject * -Overlapped_WSASendTo(OverlappedObject *self, PyObject *args) { +Overlapped_WSASendTo(OverlappedObject *self, PyObject *args) +{ HANDLE handle; PyObject *bufobj; DWORD flags; @@ -1546,7 +1557,7 @@ Overlapped_WSASendTo(OverlappedObject *self, PyObject *args) { #if SIZEOF_SIZE_T > SIZEOF_LONG if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) { PyBuffer_Release(&self->user_buffer); - PyErr_SetString(PyExc_ValueError, "buffer to large"); + PyErr_SetString(PyExc_ValueError, "buffer too large"); return NULL; } #endif @@ -1582,7 +1593,8 @@ PyDoc_STRVAR( "Start overlapped receive"); static PyObject * -Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *args) { +Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *args) +{ HANDLE handle; DWORD size; DWORD flags = 0; @@ -1761,10 +1773,9 @@ static PyMethodDef overlapped_functions[] = { METH_VARARGS, SetEvent_doc}, {"ResetEvent", overlapped_ResetEvent, METH_VARARGS, ResetEvent_doc}, - {"ConnectPipe", - (PyCFunction)ConnectPipe, + {"ConnectPipe", ConnectPipe, METH_VARARGS, ConnectPipe_doc}, - {"WSAConnect", (PyCFunction) overlapped_WSAConnect, + {"WSAConnect", overlapped_WSAConnect, METH_VARARGS, overlapped_WSAConnect_doc}, {NULL} }; From 4211be9ed295125c2720f4fa863fcd49b5bbb05b Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Fri, 29 Mar 2019 21:20:47 -0400 Subject: [PATCH 17/42] fix bad rebase --- Modules/overlapped.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 82b4ad7b0df71d..31507fe7b52ebf 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -661,7 +661,6 @@ unparse_address(LPSOCKADDR Address, DWORD Length) char AddressString[40]; unsigned int port; PVOID pSinAddr; - DWORD err; switch(Address->sa_family) { case AF_INET: @@ -1653,7 +1652,6 @@ Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *args) self->type = TYPE_NOT_STARTED; return SetFromWindowsErr(err); } ->>>>>>> bpo-29883: Add UDP to Windows Proactor Event Loop } From 504ae831397b7c47ebb0225a6113615b4284891c Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Fri, 5 Apr 2019 18:11:20 -0400 Subject: [PATCH 18/42] address PR feedback --- Lib/asyncio/proactor_events.py | 4 ++-- Modules/overlapped.c | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 47399e43ae18d4..975a6dcc88e07f 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -28,7 +28,7 @@ def _set_socket_extra(transport, sock): try: transport._extra['sockname'] = sock.getsockname() - except (socket.error, AttributeError): + except socket.error: if transport._loop.get_debug(): logger.warning( "getsockname() failed on %r", sock, exc_info=True) @@ -36,7 +36,7 @@ def _set_socket_extra(transport, sock): if 'peername' not in transport._extra: try: transport._extra['peername'] = sock.getpeername() - except (socket.error, AttributeError): + except socket.error: # UDP sockets may not have a peer name transport._extra['peername'] = None diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 31507fe7b52ebf..710b16511b2f9f 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -789,6 +789,10 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) addr = unparse_address((SOCKADDR*)&self->read_from.address, self->read_from.address_length); + if (addr == NULL) { + return NULL; + } + // The result is a two item tuple: (message, (address, port)) self->read_from.result = PyTuple_New(2); if (self->read_from.result == NULL) { From 58e9f8d5205ed270cf680f58876e9a489848a892 Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Thu, 11 Apr 2019 20:52:10 -0400 Subject: [PATCH 19/42] fix memory leak --- Modules/overlapped.c | 1 + 1 file changed, 1 insertion(+) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 710b16511b2f9f..f994f32575bc67 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -796,6 +796,7 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) // The result is a two item tuple: (message, (address, port)) self->read_from.result = PyTuple_New(2); if (self->read_from.result == NULL) { + Py_CLEAR(addr); return NULL; } From 0c935b0a68ccc2d130cd3f8971248456a6e1d2b2 Mon Sep 17 00:00:00 2001 From: Adam Meily Date: Thu, 2 May 2019 18:16:15 -0400 Subject: [PATCH 20/42] fix memory leak when receiving on UDP endpoint --- Modules/overlapped.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index f994f32575bc67..5ffe4b6dce7e63 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -591,12 +591,10 @@ Overlapped_clear(OverlappedObject *self) // The result tuple of (message, (address, port)) is only // allocated _after_ a message has been received. if(self->read_from.result) { - // We've received a message, free the result tuple which will - // also free the message buffer. + // We've received a message, free the result tuple. Py_CLEAR(self->read_from.result); - self->read_from.buffer = NULL; - } else if(self->read_from.buffer) { - // We haven't received a message, only free the buffer. + } + if(self->read_from.buffer) { Py_CLEAR(self->read_from.buffer); } break; From bb0c1d20378baca737d6b03e4f84e31a23caa5c1 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 15:15:27 +0300 Subject: [PATCH 21/42] Use None address if recvfrom is unexpectedly finished --- Lib/asyncio/windows_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 0ece7bd80fab60..f22b0ad766e2e8 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -487,7 +487,7 @@ def recvfrom(self, conn, nbytes, flags=0): try: ov.WSARecvFrom(conn.fileno(), nbytes, flags) except BrokenPipeError: - return self._result((b'', (None, None))) + return self._result((b'', None)) def finish_recv(trans, key, ov): try: From 43cd9445704b5b4baed297d16b4b00b74203f21c Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 16:02:29 +0300 Subject: [PATCH 22/42] Use explicit check for None in self._address --- Lib/asyncio/proactor_events.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 975a6dcc88e07f..30420149f5d7ca 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -472,7 +472,7 @@ def sendto(self, data, addr=None): if not data: return - if self._address and addr not in (None, self._address): + if self._address is not None and addr not in (None, self._address): raise ValueError( f'Invalid address: must be None or {self._address}') @@ -507,7 +507,7 @@ def _loop_writing(self, fut=None): return data, addr = self._buffer.pop() - if self._address: + if self._address is not None: self._write_fut = self._loop._proactor.send(self._sock, data) else: self._write_fut = self._loop._proactor.sendto(self._sock, data, addr=addr) @@ -536,14 +536,14 @@ def _loop_reading(self, fut=None): data = None return - if self._address: + if self._address is not None: data, addr = res, self._address else: data, addr = res if self._conn_lost: return - if self._address: + if self._address is not None: self._read_fut = self._loop._proactor.recv(self._sock, 4096) else: self._read_fut = self._loop._proactor.recvfrom(self._sock, 4096) From 5eabe0565d3428b14d5906aa36eed3c62e33a690 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 17:17:11 +0300 Subject: [PATCH 23/42] Return (addr, host, flow_info, scope_id) in unparse_address for IPv6 --- Modules/overlapped.c | 152 +++++++++++++++++++++++++++---------------- 1 file changed, 96 insertions(+), 56 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 5ffe4b6dce7e63..97ff39d78999e0 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -582,30 +582,30 @@ static int Overlapped_clear(OverlappedObject *self) { switch (self->type) { - case TYPE_READ: - case TYPE_ACCEPT: - Py_CLEAR(self->allocated_buffer); - break; - case TYPE_READ_FROM: - // An initial call to WSARecvFrom will only allocate the buffer. - // The result tuple of (message, (address, port)) is only - // allocated _after_ a message has been received. - if(self->read_from.result) { - // We've received a message, free the result tuple. - Py_CLEAR(self->read_from.result); - } - if(self->read_from.buffer) { - Py_CLEAR(self->read_from.buffer); - } - break; - case TYPE_WRITE: - case TYPE_WRITE_TO: - case TYPE_READINTO: - if (self->user_buffer.obj) { - PyBuffer_Release(&self->user_buffer); - } - break; - } + case TYPE_READ: + case TYPE_ACCEPT: + Py_CLEAR(self->allocated_buffer); + break; + case TYPE_READ_FROM: + // An initial call to WSARecvFrom will only allocate the buffer. + // The result tuple of (message, address) is only + // allocated _after_ a message has been received. + if(self->read_from.result) { + // We've received a message, free the result tuple. + Py_CLEAR(self->read_from.result); + } + if(self->read_from.buffer) { + Py_CLEAR(self->read_from.buffer); + } + break; + case TYPE_WRITE: + case TYPE_WRITE_TO: + case TYPE_READINTO: + if (self->user_buffer.obj) { + PyBuffer_Release(&self->user_buffer); + } + break; + } self->type = TYPE_NOT_STARTED; return 0; } @@ -652,34 +652,74 @@ Overlapped_dealloc(OverlappedObject *self) SetLastError(olderr); } + +/* Convert IPv4 sockaddr to a Python str. */ + +static PyObject * +make_ipv4_addr(const struct sockaddr_in *addr) +{ + char buf[INET_ADDRSTRLEN]; + if (inet_ntop(AF_INET, &addr->sin_addr, buf, sizeof(buf)) == NULL) { + PyErr_SetFromErrno(PyExc_OSError); + return NULL; + } + return PyUnicode_FromString(buf); +} + +#ifdef ENABLE_IPV6 +/* Convert IPv6 sockaddr to a Python str. */ + +static PyObject * +make_ipv6_addr(const struct sockaddr_in6 *addr) +{ + char buf[INET6_ADDRSTRLEN]; + if (inet_ntop(AF_INET6, &addr->sin6_addr, buf, sizeof(buf)) == NULL) { + PyErr_SetFromErrno(PyExc_OSError); + return NULL; + } + return PyUnicode_FromString(buf); +} +#endif + static PyObject* unparse_address(LPSOCKADDR Address, DWORD Length) { - // An IPv6 address has a maximum length of 39 characters - char AddressString[40]; - unsigned int port; - PVOID pSinAddr; + /* The function is adopted from mocketmodule.c makesockaddr()*/ switch(Address->sa_family) { - case AF_INET: - port = ntohs(((SOCKADDR_IN*)Address)->sin_port); - pSinAddr = &((SOCKADDR_IN*)Address)->sin_addr; - break; - case AF_INET6: - port = ntohs(((SOCKADDR_IN6*)Address)->sin6_port); - pSinAddr = &((SOCKADDR_IN6*)Address)->sin6_addr; - break; - default: - return SetFromWindowsErr(ERROR_INVALID_PARAMETER); + case AF_INET: + { + const struct sockaddr_in *a = (const struct sockaddr_in *)Address; + PyObject *addrobj = make_ipv4_addr(a); + PyObject *ret = NULL; + if (addrobj) { + ret = Py_BuildValue("Oi", addrobj, ntohs(a->sin_port)); + Py_DECREF(addrobj); + } + return ret; + } +#ifdef ENABLE_IPV6 + case AF_INET6: + { + const struct sockaddr_in6 *a = (const struct sockaddr_in6 *)Address; + PyObject *addrobj = make_ipv6_addr(a); + PyObject *ret = NULL; + if (addrobj) { + ret = Py_BuildValue("OiII", + addrobj, + ntohs(a->sin6_port), + ntohl(a->sin6_flowinfo), + a->sin6_scope_id); + Py_DECREF(addrobj); + } + return ret; + } +#endif /* ENABLE_IPV6 */ + default: + { + return SetFromWindowsErr(ERROR_INVALID_PARAMETER); + } } - - if (!inet_ntop(Address->sa_family, pSinAddr, AddressString, - sizeof(AddressString))) - { - return SetFromWindowsErr(WSAGetLastError()); - } - - return Py_BuildValue("(sH)", AddressString, port); } PyDoc_STRVAR( @@ -791,7 +831,7 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) return NULL; } - // The result is a two item tuple: (message, (address, port)) + // The result is a two item tuple: (message, address) self->read_from.result = PyTuple_New(2); if (self->read_from.result == NULL) { Py_CLEAR(addr); @@ -801,7 +841,7 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) // first item: message PyTuple_SET_ITEM(self->read_from.result, 0, self->read_from.buffer); - // second item: tuple(address, port) + // second item: address PyTuple_SET_ITEM(self->read_from.result, 1, addr); Py_INCREF(self->read_from.result); @@ -1410,7 +1450,7 @@ PyDoc_STRVAR( "Connect to the pipe for asynchronous I/O (overlapped)."); static PyObject * -ConnectPipe(OverlappedObject *self, PyObject *args) +overlapped_ConnectPipe(OverlappedObject *self, PyObject *args) { PyObject *AddressObj; wchar_t *Address; @@ -1578,12 +1618,12 @@ Overlapped_WSASendTo(OverlappedObject *self, PyObject *args) ERROR_SUCCESS); switch(err) { - case ERROR_SUCCESS: - case ERROR_IO_PENDING: - Py_RETURN_NONE; - default: - self->type = TYPE_NOT_STARTED; - return SetFromWindowsErr(err); + case ERROR_SUCCESS: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); } } @@ -1774,7 +1814,7 @@ static PyMethodDef overlapped_functions[] = { METH_VARARGS, SetEvent_doc}, {"ResetEvent", overlapped_ResetEvent, METH_VARARGS, ResetEvent_doc}, - {"ConnectPipe", ConnectPipe, + {"ConnectPipe", overlapped_ConnectPipe, METH_VARARGS, ConnectPipe_doc}, {"WSAConnect", overlapped_WSAConnect, METH_VARARGS, overlapped_WSAConnect_doc}, From d5fed183e2c41a3719e4dd08c1e0ac8bf90a19ba Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 17:37:11 +0300 Subject: [PATCH 24/42] Revert test --- Lib/test/test_asyncio/test_windows_events.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index 05f85159be0cd5..e201a0696796d6 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -100,9 +100,11 @@ async def _test_pipe(self): clients = [] for i in range(5): - stream_reader = asyncio.StreamReader(loop=self.loop) + stream_reader = asyncio.StreamReader(loop=self.loop, + _asyncio_internal=True) protocol = asyncio.StreamReaderProtocol(stream_reader, - loop=self.loop) + loop=self.loop, + _asyncio_internal=True) trans, proto = await self.loop.create_pipe_connection( lambda: protocol, ADDRESS) self.assertIsInstance(trans, asyncio.Transport) From 6e505674da99d44049261242697a4ed4dc2c0a52 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 17:43:17 +0300 Subject: [PATCH 25/42] Fix spaces --- Modules/overlapped.c | 146 +++++++++++++++++++++---------------------- 1 file changed, 73 insertions(+), 73 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 97ff39d78999e0..e1a4e10e570d4d 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -582,30 +582,30 @@ static int Overlapped_clear(OverlappedObject *self) { switch (self->type) { - case TYPE_READ: - case TYPE_ACCEPT: - Py_CLEAR(self->allocated_buffer); - break; - case TYPE_READ_FROM: - // An initial call to WSARecvFrom will only allocate the buffer. - // The result tuple of (message, address) is only - // allocated _after_ a message has been received. - if(self->read_from.result) { - // We've received a message, free the result tuple. - Py_CLEAR(self->read_from.result); - } - if(self->read_from.buffer) { - Py_CLEAR(self->read_from.buffer); - } - break; - case TYPE_WRITE: - case TYPE_WRITE_TO: - case TYPE_READINTO: - if (self->user_buffer.obj) { - PyBuffer_Release(&self->user_buffer); - } - break; - } + case TYPE_READ: + case TYPE_ACCEPT: + Py_CLEAR(self->allocated_buffer); + break; + case TYPE_READ_FROM: + // An initial call to WSARecvFrom will only allocate the buffer. + // The result tuple of (message, address) is only + // allocated _after_ a message has been received. + if(self->read_from.result) { + // We've received a message, free the result tuple. + Py_CLEAR(self->read_from.result); + } + if(self->read_from.buffer) { + Py_CLEAR(self->read_from.buffer); + } + break; + case TYPE_WRITE: + case TYPE_WRITE_TO: + case TYPE_READINTO: + if (self->user_buffer.obj) { + PyBuffer_Release(&self->user_buffer); + } + break; + } self->type = TYPE_NOT_STARTED; return 0; } @@ -658,12 +658,12 @@ Overlapped_dealloc(OverlappedObject *self) static PyObject * make_ipv4_addr(const struct sockaddr_in *addr) { - char buf[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, &addr->sin_addr, buf, sizeof(buf)) == NULL) { - PyErr_SetFromErrno(PyExc_OSError); - return NULL; - } - return PyUnicode_FromString(buf); + char buf[INET_ADDRSTRLEN]; + if (inet_ntop(AF_INET, &addr->sin_addr, buf, sizeof(buf)) == NULL) { + PyErr_SetFromErrno(PyExc_OSError); + return NULL; + } + return PyUnicode_FromString(buf); } #ifdef ENABLE_IPV6 @@ -672,53 +672,53 @@ make_ipv4_addr(const struct sockaddr_in *addr) static PyObject * make_ipv6_addr(const struct sockaddr_in6 *addr) { - char buf[INET6_ADDRSTRLEN]; - if (inet_ntop(AF_INET6, &addr->sin6_addr, buf, sizeof(buf)) == NULL) { - PyErr_SetFromErrno(PyExc_OSError); - return NULL; - } - return PyUnicode_FromString(buf); + char buf[INET6_ADDRSTRLEN]; + if (inet_ntop(AF_INET6, &addr->sin6_addr, buf, sizeof(buf)) == NULL) { + PyErr_SetFromErrno(PyExc_OSError); + return NULL; + } + return PyUnicode_FromString(buf); } #endif static PyObject* unparse_address(LPSOCKADDR Address, DWORD Length) { - /* The function is adopted from mocketmodule.c makesockaddr()*/ + /* The function is adopted from mocketmodule.c makesockaddr()*/ switch(Address->sa_family) { - case AF_INET: - { - const struct sockaddr_in *a = (const struct sockaddr_in *)Address; - PyObject *addrobj = make_ipv4_addr(a); - PyObject *ret = NULL; - if (addrobj) { - ret = Py_BuildValue("Oi", addrobj, ntohs(a->sin_port)); - Py_DECREF(addrobj); - } - return ret; - } + case AF_INET: + { + const struct sockaddr_in *a = (const struct sockaddr_in *)Address; + PyObject *addrobj = make_ipv4_addr(a); + PyObject *ret = NULL; + if (addrobj) { + ret = Py_BuildValue("Oi", addrobj, ntohs(a->sin_port)); + Py_DECREF(addrobj); + } + return ret; + } #ifdef ENABLE_IPV6 - case AF_INET6: - { - const struct sockaddr_in6 *a = (const struct sockaddr_in6 *)Address; - PyObject *addrobj = make_ipv6_addr(a); - PyObject *ret = NULL; - if (addrobj) { - ret = Py_BuildValue("OiII", - addrobj, - ntohs(a->sin6_port), - ntohl(a->sin6_flowinfo), - a->sin6_scope_id); - Py_DECREF(addrobj); - } - return ret; - } + case AF_INET6: + { + const struct sockaddr_in6 *a = (const struct sockaddr_in6 *)Address; + PyObject *addrobj = make_ipv6_addr(a); + PyObject *ret = NULL; + if (addrobj) { + ret = Py_BuildValue("OiII", + addrobj, + ntohs(a->sin6_port), + ntohl(a->sin6_flowinfo), + a->sin6_scope_id); + Py_DECREF(addrobj); + } + return ret; + } #endif /* ENABLE_IPV6 */ - default: - { - return SetFromWindowsErr(ERROR_INVALID_PARAMETER); - } + default: + { + return SetFromWindowsErr(ERROR_INVALID_PARAMETER); + } } } @@ -1618,12 +1618,12 @@ Overlapped_WSASendTo(OverlappedObject *self, PyObject *args) ERROR_SUCCESS); switch(err) { - case ERROR_SUCCESS: - case ERROR_IO_PENDING: - Py_RETURN_NONE; - default: - self->type = TYPE_NOT_STARTED; - return SetFromWindowsErr(err); + case ERROR_SUCCESS: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); } } From 45e2b0390ab767b8818e80ab3fa054bb11e9eaa1 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 17:47:17 +0300 Subject: [PATCH 26/42] Fix spaces --- Modules/overlapped.c | 106 +++++++++++++++++++++---------------------- 1 file changed, 53 insertions(+), 53 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index e1a4e10e570d4d..ddd9ad94b992f6 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -582,30 +582,33 @@ static int Overlapped_clear(OverlappedObject *self) { switch (self->type) { - case TYPE_READ: - case TYPE_ACCEPT: - Py_CLEAR(self->allocated_buffer); - break; - case TYPE_READ_FROM: - // An initial call to WSARecvFrom will only allocate the buffer. - // The result tuple of (message, address) is only - // allocated _after_ a message has been received. - if(self->read_from.result) { - // We've received a message, free the result tuple. - Py_CLEAR(self->read_from.result); - } - if(self->read_from.buffer) { - Py_CLEAR(self->read_from.buffer); - } - break; - case TYPE_WRITE: - case TYPE_WRITE_TO: - case TYPE_READINTO: - if (self->user_buffer.obj) { - PyBuffer_Release(&self->user_buffer); - } - break; + case TYPE_READ: + case TYPE_ACCEPT: { + Py_CLEAR(self->allocated_buffer); + break; + } + case TYPE_READ_FROM: { + // An initial call to WSARecvFrom will only allocate the buffer. + // The result tuple of (message, address) is only + // allocated _after_ a message has been received. + if(self->read_from.result) { + // We've received a message, free the result tuple. + Py_CLEAR(self->read_from.result); + } + if(self->read_from.buffer) { + Py_CLEAR(self->read_from.buffer); + } + break; + } + case TYPE_WRITE: + case TYPE_WRITE_TO: + case TYPE_READINTO: { + if (self->user_buffer.obj) { + PyBuffer_Release(&self->user_buffer); + } + break; } + } self->type = TYPE_NOT_STARTED; return 0; } @@ -687,38 +690,35 @@ unparse_address(LPSOCKADDR Address, DWORD Length) /* The function is adopted from mocketmodule.c makesockaddr()*/ switch(Address->sa_family) { - case AF_INET: - { - const struct sockaddr_in *a = (const struct sockaddr_in *)Address; - PyObject *addrobj = make_ipv4_addr(a); - PyObject *ret = NULL; - if (addrobj) { - ret = Py_BuildValue("Oi", addrobj, ntohs(a->sin_port)); - Py_DECREF(addrobj); - } - return ret; - } + case AF_INET: { + const struct sockaddr_in *a = (const struct sockaddr_in *)Address; + PyObject *addrobj = make_ipv4_addr(a); + PyObject *ret = NULL; + if (addrobj) { + ret = Py_BuildValue("Oi", addrobj, ntohs(a->sin_port)); + Py_DECREF(addrobj); + } + return ret; + } #ifdef ENABLE_IPV6 - case AF_INET6: - { - const struct sockaddr_in6 *a = (const struct sockaddr_in6 *)Address; - PyObject *addrobj = make_ipv6_addr(a); - PyObject *ret = NULL; - if (addrobj) { - ret = Py_BuildValue("OiII", - addrobj, - ntohs(a->sin6_port), - ntohl(a->sin6_flowinfo), - a->sin6_scope_id); - Py_DECREF(addrobj); - } - return ret; - } + case AF_INET6: { + const struct sockaddr_in6 *a = (const struct sockaddr_in6 *)Address; + PyObject *addrobj = make_ipv6_addr(a); + PyObject *ret = NULL; + if (addrobj) { + ret = Py_BuildValue("OiII", + addrobj, + ntohs(a->sin6_port), + ntohl(a->sin6_flowinfo), + a->sin6_scope_id); + Py_DECREF(addrobj); + } + return ret; + } #endif /* ENABLE_IPV6 */ - default: - { - return SetFromWindowsErr(ERROR_INVALID_PARAMETER); - } + default: { + return SetFromWindowsErr(ERROR_INVALID_PARAMETER); + } } } From 87679d78322e4e524fa93e64bc205a7fbd22700c Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 17:49:05 +0300 Subject: [PATCH 27/42] Fix styling --- Modules/overlapped.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index ddd9ad94b992f6..a841c9bc458c06 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -1510,7 +1510,7 @@ Overlapped_traverse(OverlappedObject *self, visitproc visit, void *arg) // UDP functions PyDoc_STRVAR( - overlapped_WSAConnect_doc, + WSAConnect_doc, "WSAConnect(client_handle, address_as_bytes) -> Overlapped[None]\n\n" "Bind a remote address to a connectionless (UDP) socket"); @@ -1817,7 +1817,7 @@ static PyMethodDef overlapped_functions[] = { {"ConnectPipe", overlapped_ConnectPipe, METH_VARARGS, ConnectPipe_doc}, {"WSAConnect", overlapped_WSAConnect, - METH_VARARGS, overlapped_WSAConnect_doc}, + METH_VARARGS, WSAConnect_doc}, {NULL} }; From f3dab42aa483972cf71fbcf6a1d8290f8ed00922 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 18:26:49 +0300 Subject: [PATCH 28/42] buffer -> allocated_buffer --- Modules/overlapped.c | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 97ff39d78999e0..d3f28cecc6068d 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -63,7 +63,7 @@ typedef struct { // A (buffer, (host, port)) tuple PyObject *result; // The actual read buffer - PyObject *buffer; + PyObject *allocated_buffer; struct sockaddr_in6 address; int address_length; } read_from; @@ -594,8 +594,8 @@ Overlapped_clear(OverlappedObject *self) // We've received a message, free the result tuple. Py_CLEAR(self->read_from.result); } - if(self->read_from.buffer) { - Py_CLEAR(self->read_from.buffer); + if(self->read_from.allocated_buffer) { + Py_CLEAR(self->read_from.allocated_buffer); } break; case TYPE_WRITE: @@ -796,7 +796,7 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) } else if (self->type == TYPE_READ_FROM && (self->read_from.result != NULL || - self->read_from.buffer != NULL)) + self->read_from.allocated_buffer != NULL)) { break; } @@ -815,10 +815,11 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) Py_INCREF(self->allocated_buffer); return self->allocated_buffer; case TYPE_READ_FROM: - assert(PyBytes_CheckExact(self->read_from.buffer)); + assert(PyBytes_CheckExact(self->read_from.allocated_buffer)); - if (transferred != PyBytes_GET_SIZE(self->read_from.buffer) && - _PyBytes_Resize(&self->read_from.buffer, transferred)) + if (transferred != PyBytes_GET_SIZE( + self->read_from.allocated_buffer) && + _PyBytes_Resize(&self->read_from.allocated_buffer, transferred)) { return NULL; } @@ -840,12 +841,12 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) // first item: message PyTuple_SET_ITEM(self->read_from.result, 0, - self->read_from.buffer); + self->read_from.allocated_buffer); // second item: address PyTuple_SET_ITEM(self->read_from.result, 1, addr); Py_INCREF(self->read_from.result); - Py_INCREF(self->read_from.buffer); + Py_INCREF(self->read_from.allocated_buffer); return self->read_from.result; default: return PyLong_FromUnsignedLong((unsigned long) transferred); @@ -1670,7 +1671,7 @@ Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *args) self->type = TYPE_READ_FROM; self->handle = handle; - self->read_from.buffer = buf; + self->read_from.allocated_buffer = buf; memset(&self->read_from.address, 0, sizeof(self->read_from.address)); self->read_from.address_length = sizeof(self->read_from.address); From c17a3777be18645dcfb566b2fc4cfa2bc3bbefca Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 18:42:24 +0300 Subject: [PATCH 29/42] Postfix --- Modules/overlapped.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 1de743af4435ae..0185e1a0abe612 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -595,8 +595,8 @@ Overlapped_clear(OverlappedObject *self) // We've received a message, free the result tuple. Py_CLEAR(self->read_from.result); } - if(self->read_from.buffer) { - Py_CLEAR(self->read_from.buffer); + if(self->read_from.allocated_buffer) { + Py_CLEAR(self->read_from.allocated_buffer); } break; } @@ -1451,7 +1451,7 @@ PyDoc_STRVAR( "Connect to the pipe for asynchronous I/O (overlapped)."); static PyObject * -overlapped_ConnectPipe(OverlappedObject *self, PyObject *args) +overlapped_ConnectPipe(PyObject *self, PyObject *args) { PyObject *AddressObj; wchar_t *Address; From 2b44c36f1ca1d5562f0391fc39970634c8c9cbe1 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 18:47:22 +0300 Subject: [PATCH 30/42] Fix style --- Modules/overlapped.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 0185e1a0abe612..d7ddba321823c4 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -1619,12 +1619,12 @@ Overlapped_WSASendTo(OverlappedObject *self, PyObject *args) ERROR_SUCCESS); switch(err) { - case ERROR_SUCCESS: - case ERROR_IO_PENDING: - Py_RETURN_NONE; - default: - self->type = TYPE_NOT_STARTED; - return SetFromWindowsErr(err); + case ERROR_SUCCESS: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); } } From 2bfb099ddf0e9ee1ba36a8f9c1092cdd38b82548 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 18:52:07 +0300 Subject: [PATCH 31/42] Support TYPE_READ_FROM in tp_traverse slot --- Modules/overlapped.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index d7ddba321823c4..1e1ec280de2c0b 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -1504,6 +1504,13 @@ Overlapped_traverse(OverlappedObject *self, visitproc visit, void *arg) Py_VISIT(&self->user_buffer.obj); } break; + case TYPE_READ_FROM: + if(self->read_from.result) { + Py_VISIT(self->read_from.result); + } + if(self->read_from.allocated_buffer) { + Py_VISIT(self->read_from.allocated_buffer); + } } return 0; } From d033f3e797d5ed633c069d362cc350c195368ffc Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 20 May 2019 18:55:47 +0300 Subject: [PATCH 32/42] Fix another memory leak --- Modules/overlapped.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 1e1ec280de2c0b..a0c6a1e0fa1ac6 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -840,13 +840,11 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) } // first item: message + Py_INCREF(self->read_from.allocated_buffer); PyTuple_SET_ITEM(self->read_from.result, 0, self->read_from.allocated_buffer); // second item: address PyTuple_SET_ITEM(self->read_from.result, 1, addr); - - Py_INCREF(self->read_from.result); - Py_INCREF(self->read_from.allocated_buffer); return self->read_from.result; default: return PyLong_FromUnsignedLong((unsigned long) transferred); From 350352691bdd6f11cb3df9d8abe99303346bad21 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 22 May 2019 16:07:28 +0300 Subject: [PATCH 33/42] Fix ref count --- Lib/asyncio/windows_events.py | 2 +- Modules/overlapped.c | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index f22b0ad766e2e8..ec8c25c5ed8e54 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -651,7 +651,7 @@ async def connect_pipe(self, address): # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) - await tasks.sleep(delay, loop=self._loop) + await tasks.sleep(delay) return windows_utils.PipeHandle(handle) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index a0c6a1e0fa1ac6..c46f0dce950259 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -845,6 +845,8 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) self->read_from.allocated_buffer); // second item: address PyTuple_SET_ITEM(self->read_from.result, 1, addr); + + Py_INCREF(self->read_from.result); return self->read_from.result; default: return PyLong_FromUnsignedLong((unsigned long) transferred); From 227a390a50e5c33d796f9ebf25fda37edca358e6 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Thu, 23 May 2019 17:38:57 +0300 Subject: [PATCH 34/42] Add datagram mocking tests --- Lib/test/test_asyncio/test_proactor_events.py | 289 ++++++++++++++++++ 1 file changed, 289 insertions(+) diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index e094d080770c59..ddaa99925532cb 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -726,6 +726,295 @@ def test_pause_resume_reading(self): self.assertFalse(tr.is_reading()) +class ProactorDatagramTransportTests(test_utils.TestCase): + + def setUp(self): + super().setUp() + self.loop = self.new_test_loop() + self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) + self.sock = mock.Mock(spec_set=socket.socket) + self.sock.fileno.return_value = 7 + + def datagram_transport(self, address=None): + self.sock.getpeername.side_effect = None if address else OSError + transport = _SelectorDatagramTransport(self.loop, self.sock, + self.protocol, + address=address) + self.addCleanup(close_transport, transport) + return transport + + def test_read_ready(self): + transport = self.datagram_transport() + + self.sock.recvfrom.return_value = (b'data', ('0.0.0.0', 1234)) + transport._read_ready() + + self.protocol.datagram_received.assert_called_with( + b'data', ('0.0.0.0', 1234)) + + def test_read_ready_tryagain(self): + transport = self.datagram_transport() + + self.sock.recvfrom.side_effect = BlockingIOError + transport._fatal_error = mock.Mock() + transport._read_ready() + + self.assertFalse(transport._fatal_error.called) + + def test_read_ready_err(self): + transport = self.datagram_transport() + + err = self.sock.recvfrom.side_effect = RuntimeError() + transport._fatal_error = mock.Mock() + transport._read_ready() + + transport._fatal_error.assert_called_with( + err, + 'Fatal read error on datagram transport') + + def test_read_ready_oserr(self): + transport = self.datagram_transport() + + err = self.sock.recvfrom.side_effect = OSError() + transport._fatal_error = mock.Mock() + transport._read_ready() + + self.assertFalse(transport._fatal_error.called) + self.protocol.error_received.assert_called_with(err) + + def test_sendto(self): + data = b'data' + transport = self.datagram_transport() + transport.sendto(data, ('0.0.0.0', 1234)) + self.assertTrue(self.sock.sendto.called) + self.assertEqual( + self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) + + def test_sendto_bytearray(self): + data = bytearray(b'data') + transport = self.datagram_transport() + transport.sendto(data, ('0.0.0.0', 1234)) + self.assertTrue(self.sock.sendto.called) + self.assertEqual( + self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) + + def test_sendto_memoryview(self): + data = memoryview(b'data') + transport = self.datagram_transport() + transport.sendto(data, ('0.0.0.0', 1234)) + self.assertTrue(self.sock.sendto.called) + self.assertEqual( + self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) + + def test_sendto_no_data(self): + transport = self.datagram_transport() + transport._buffer.append((b'data', ('0.0.0.0', 12345))) + transport.sendto(b'', ()) + self.assertFalse(self.sock.sendto.called) + self.assertEqual( + [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) + + def test_sendto_buffer(self): + transport = self.datagram_transport() + transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + transport.sendto(b'data2', ('0.0.0.0', 12345)) + self.assertFalse(self.sock.sendto.called) + self.assertEqual( + [(b'data1', ('0.0.0.0', 12345)), + (b'data2', ('0.0.0.0', 12345))], + list(transport._buffer)) + + def test_sendto_buffer_bytearray(self): + data2 = bytearray(b'data2') + transport = self.datagram_transport() + transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + transport.sendto(data2, ('0.0.0.0', 12345)) + self.assertFalse(self.sock.sendto.called) + self.assertEqual( + [(b'data1', ('0.0.0.0', 12345)), + (b'data2', ('0.0.0.0', 12345))], + list(transport._buffer)) + self.assertIsInstance(transport._buffer[1][0], bytes) + + def test_sendto_buffer_memoryview(self): + data2 = memoryview(b'data2') + transport = self.datagram_transport() + transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + transport.sendto(data2, ('0.0.0.0', 12345)) + self.assertFalse(self.sock.sendto.called) + self.assertEqual( + [(b'data1', ('0.0.0.0', 12345)), + (b'data2', ('0.0.0.0', 12345))], + list(transport._buffer)) + self.assertIsInstance(transport._buffer[1][0], bytes) + + def test_sendto_tryagain(self): + data = b'data' + + self.sock.sendto.side_effect = BlockingIOError + + transport = self.datagram_transport() + transport.sendto(data, ('0.0.0.0', 12345)) + + self.loop.assert_writer(7, transport._sendto_ready) + self.assertEqual( + [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) + + @mock.patch('asyncio.selector_events.logger') + def test_sendto_exception(self, m_log): + data = b'data' + err = self.sock.sendto.side_effect = RuntimeError() + + transport = self.datagram_transport() + transport._fatal_error = mock.Mock() + transport.sendto(data, ()) + + self.assertTrue(transport._fatal_error.called) + transport._fatal_error.assert_called_with( + err, + 'Fatal write error on datagram transport') + transport._conn_lost = 1 + + transport._address = ('123',) + transport.sendto(data) + transport.sendto(data) + transport.sendto(data) + transport.sendto(data) + transport.sendto(data) + m_log.warning.assert_called_with('socket.send() raised exception.') + + def test_sendto_error_received(self): + data = b'data' + + self.sock.sendto.side_effect = ConnectionRefusedError + + transport = self.datagram_transport() + transport._fatal_error = mock.Mock() + transport.sendto(data, ()) + + self.assertEqual(transport._conn_lost, 0) + self.assertFalse(transport._fatal_error.called) + + def test_sendto_error_received_connected(self): + data = b'data' + + self.sock.send.side_effect = ConnectionRefusedError + + transport = self.datagram_transport(address=('0.0.0.0', 1)) + transport._fatal_error = mock.Mock() + transport.sendto(data) + + self.assertFalse(transport._fatal_error.called) + self.assertTrue(self.protocol.error_received.called) + + def test_sendto_str(self): + transport = self.datagram_transport() + self.assertRaises(TypeError, transport.sendto, 'str', ()) + + def test_sendto_connected_addr(self): + transport = self.datagram_transport(address=('0.0.0.0', 1)) + self.assertRaises( + ValueError, transport.sendto, b'str', ('0.0.0.0', 2)) + + def test_sendto_closing(self): + transport = self.datagram_transport(address=(1,)) + transport.close() + self.assertEqual(transport._conn_lost, 1) + transport.sendto(b'data', (1,)) + self.assertEqual(transport._conn_lost, 2) + + def test_sendto_ready(self): + data = b'data' + self.sock.sendto.return_value = len(data) + + transport = self.datagram_transport() + transport._buffer.append((data, ('0.0.0.0', 12345))) + self.loop._add_writer(7, transport._sendto_ready) + transport._sendto_ready() + self.assertTrue(self.sock.sendto.called) + self.assertEqual( + self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345))) + self.assertFalse(self.loop.writers) + + def test_sendto_ready_closing(self): + data = b'data' + self.sock.send.return_value = len(data) + + transport = self.datagram_transport() + transport._closing = True + transport._buffer.append((data, ())) + self.loop._add_writer(7, transport._sendto_ready) + transport._sendto_ready() + self.sock.sendto.assert_called_with(data, ()) + self.assertFalse(self.loop.writers) + self.sock.close.assert_called_with() + self.protocol.connection_lost.assert_called_with(None) + + def test_sendto_ready_no_data(self): + transport = self.datagram_transport() + self.loop._add_writer(7, transport._sendto_ready) + transport._sendto_ready() + self.assertFalse(self.sock.sendto.called) + self.assertFalse(self.loop.writers) + + def test_sendto_ready_tryagain(self): + self.sock.sendto.side_effect = BlockingIOError + + transport = self.datagram_transport() + transport._buffer.extend([(b'data1', ()), (b'data2', ())]) + self.loop._add_writer(7, transport._sendto_ready) + transport._sendto_ready() + + self.loop.assert_writer(7, transport._sendto_ready) + self.assertEqual( + [(b'data1', ()), (b'data2', ())], + list(transport._buffer)) + + def test_sendto_ready_exception(self): + err = self.sock.sendto.side_effect = RuntimeError() + + transport = self.datagram_transport() + transport._fatal_error = mock.Mock() + transport._buffer.append((b'data', ())) + transport._sendto_ready() + + transport._fatal_error.assert_called_with( + err, + 'Fatal write error on datagram transport') + + def test_sendto_ready_error_received(self): + self.sock.sendto.side_effect = ConnectionRefusedError + + transport = self.datagram_transport() + transport._fatal_error = mock.Mock() + transport._buffer.append((b'data', ())) + transport._sendto_ready() + + self.assertFalse(transport._fatal_error.called) + + def test_sendto_ready_error_received_connection(self): + self.sock.send.side_effect = ConnectionRefusedError + + transport = self.datagram_transport(address=('0.0.0.0', 1)) + transport._fatal_error = mock.Mock() + transport._buffer.append((b'data', ())) + transport._sendto_ready() + + self.assertFalse(transport._fatal_error.called) + self.assertTrue(self.protocol.error_received.called) + + @mock.patch('asyncio.base_events.logger.error') + def test_fatal_error_connected(self, m_exc): + transport = self.datagram_transport(address=('0.0.0.0', 1)) + err = ConnectionRefusedError() + transport._fatal_error(err) + self.assertFalse(self.protocol.error_received.called) + m_exc.assert_called_with( + test_utils.MockPattern( + 'Fatal error on transport\nprotocol:.*\ntransport:.*'), + exc_info=(ConnectionRefusedError, MOCK_ANY, MOCK_ANY)) + + class BaseProactorEventLoopTests(test_utils.TestCase): def setUp(self): From eb474a1e1353f6a6989d322c50a6d307596503c3 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Thu, 23 May 2019 18:25:13 +0300 Subject: [PATCH 35/42] Work on mock tests --- Lib/asyncio/proactor_events.py | 11 ++--- Lib/test/test_asyncio/test_proactor_events.py | 40 ++++++++++--------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 30420149f5d7ca..3dc6c8bd532a46 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -482,7 +482,8 @@ def sendto(self, data, addr=None): self._conn_lost += 1 return - self._buffer.appendleft((data, addr)) + # Ensure that what we buffer is immutable. + self._buffer.append((bytes(data), addr)) if self._write_fut is None: # No current write operations are active, kick one off @@ -506,11 +507,11 @@ def _loop_writing(self, fut=None): self._loop.call_soon(self._call_connection_lost, None) return - data, addr = self._buffer.pop() + data, addr = self._buffer.popleft() if self._address is not None: self._write_fut = self._loop._proactor.send(self._sock, data) else: - self._write_fut = self._loop._proactor.sendto(self._sock, data, addr=addr) + self._write_fut = self._loop._proactor.sendto(self._sock, data, addr) except OSError as exc: self._protocol.error_received(exc) except Exception as exc: @@ -544,9 +545,9 @@ def _loop_reading(self, fut=None): if self._conn_lost: return if self._address is not None: - self._read_fut = self._loop._proactor.recv(self._sock, 4096) + self._read_fut = self._loop._proactor.recv(self._sock, self.max_size) else: - self._read_fut = self._loop._proactor.recvfrom(self._sock, 4096) + self._read_fut = self._loop._proactor.recvfrom(self._sock, self.max_size) except OSError as exc: self._protocol.error_received(exc) except exceptions.CancelledError: diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index ddaa99925532cb..a7d3d05a84ae78 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -731,13 +731,15 @@ class ProactorDatagramTransportTests(test_utils.TestCase): def setUp(self): super().setUp() self.loop = self.new_test_loop() + self.proactor = mock.Mock() + self.loop._proactor = self.proactor self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) self.sock = mock.Mock(spec_set=socket.socket) self.sock.fileno.return_value = 7 def datagram_transport(self, address=None): self.sock.getpeername.side_effect = None if address else OSError - transport = _SelectorDatagramTransport(self.loop, self.sock, + transport = _ProactorDatagramTransport(self.loop, self.sock, self.protocol, address=address) self.addCleanup(close_transport, transport) @@ -746,8 +748,11 @@ def datagram_transport(self, address=None): def test_read_ready(self): transport = self.datagram_transport() - self.sock.recvfrom.return_value = (b'data', ('0.0.0.0', 1234)) + fut = self.loop.create_future() + fut.set_result((b'data', ('0.0.0.0', 1234))) + self.proactor.recv_from.return_value = fut transport._read_ready() + self.proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) self.protocol.datagram_received.assert_called_with( b'data', ('0.0.0.0', 1234)) @@ -786,25 +791,21 @@ def test_sendto(self): data = b'data' transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) - self.assertTrue(self.sock.sendto.called) - self.assertEqual( - self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) + self.assertTrue(self.proactor.sendto.called) + self.proactor.sendto.assert_called_with( + self.sock, data, ('0.0.0.0', 1234)) def test_sendto_bytearray(self): data = bytearray(b'data') transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) - self.assertTrue(self.sock.sendto.called) - self.assertEqual( - self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) + self.assertEqual(transport._buffer, deque([data, ('0.0.0.0', 1234)])) def test_sendto_memoryview(self): data = memoryview(b'data') transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) - self.assertTrue(self.sock.sendto.called) - self.assertEqual( - self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234))) + self.assertEqual(transport._buffer, deque([data, ('0.0.0.0', 1234)])) def test_sendto_no_data(self): transport = self.datagram_transport() @@ -817,8 +818,9 @@ def test_sendto_no_data(self): def test_sendto_buffer(self): transport = self.datagram_transport() transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + transport._write_fut = object() transport.sendto(b'data2', ('0.0.0.0', 12345)) - self.assertFalse(self.sock.sendto.called) + self.assertFalse(self.proactor.sendto.called) self.assertEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], @@ -828,8 +830,9 @@ def test_sendto_buffer_bytearray(self): data2 = bytearray(b'data2') transport = self.datagram_transport() transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + transport._write_fut = object() transport.sendto(data2, ('0.0.0.0', 12345)) - self.assertFalse(self.sock.sendto.called) + self.assertFalse(self.proactor.sendto.called) self.assertEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], @@ -840,8 +843,9 @@ def test_sendto_buffer_memoryview(self): data2 = memoryview(b'data2') transport = self.datagram_transport() transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + transport._write_fut = object() transport.sendto(data2, ('0.0.0.0', 12345)) - self.assertFalse(self.sock.sendto.called) + self.assertFalse(self.proactor.sendto.called) self.assertEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], @@ -860,10 +864,10 @@ def test_sendto_tryagain(self): self.assertEqual( [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) - @mock.patch('asyncio.selector_events.logger') + @mock.patch('asyncio.proactor_events.logger') def test_sendto_exception(self, m_log): data = b'data' - err = self.sock.sendto.side_effect = RuntimeError() + err = self.proactor.sendto.side_effect = RuntimeError() transport = self.datagram_transport() transport._fatal_error = mock.Mock() @@ -881,7 +885,7 @@ def test_sendto_exception(self, m_log): transport.sendto(data) transport.sendto(data) transport.sendto(data) - m_log.warning.assert_called_with('socket.send() raised exception.') + m_log.warning.assert_called_with('socket.sendto() raised exception.') def test_sendto_error_received(self): data = b'data' @@ -898,7 +902,7 @@ def test_sendto_error_received(self): def test_sendto_error_received_connected(self): data = b'data' - self.sock.send.side_effect = ConnectionRefusedError + self.proactor.send.side_effect = ConnectionRefusedError transport = self.datagram_transport(address=('0.0.0.0', 1)) transport._fatal_error = mock.Mock() From 431805dae49374548b1b45c919b369dfb65d74fc Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Thu, 23 May 2019 18:26:07 +0300 Subject: [PATCH 36/42] Fix tests --- Lib/asyncio/proactor_events.py | 2 +- Lib/test/test_asyncio/test_proactor_events.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 3dc6c8bd532a46..f21fc6aa251026 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -511,7 +511,7 @@ def _loop_writing(self, fut=None): if self._address is not None: self._write_fut = self._loop._proactor.send(self._sock, data) else: - self._write_fut = self._loop._proactor.sendto(self._sock, data, addr) + self._write_fut = self._loop._proactor.sendto(self._sock, data, addr=addr) except OSError as exc: self._protocol.error_received(exc) except Exception as exc: diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index a7d3d05a84ae78..56903f1a94a178 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -793,7 +793,7 @@ def test_sendto(self): transport.sendto(data, ('0.0.0.0', 1234)) self.assertTrue(self.proactor.sendto.called) self.proactor.sendto.assert_called_with( - self.sock, data, ('0.0.0.0', 1234)) + self.sock, data, addr=('0.0.0.0', 1234)) def test_sendto_bytearray(self): data = bytearray(b'data') From 7d5f9100447bdc19aeddfbe72a6ccf042b00b3c0 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 25 May 2019 14:47:21 +0300 Subject: [PATCH 37/42] Better test coverage --- Lib/asyncio/proactor_events.py | 2 +- Lib/test/test_asyncio/test_proactor_events.py | 135 ++++-------------- 2 files changed, 25 insertions(+), 112 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index f21fc6aa251026..f916131ac83121 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -445,7 +445,7 @@ def _pipe_closed(self, fut): class _ProactorDatagramTransport(_ProactorBasePipeTransport): - + max_size = 256 * 1024 def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): self._address = address diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 56903f1a94a178..d1afd1b8d41bf3 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -4,6 +4,7 @@ import socket import unittest import sys +from collections import deque from unittest import mock import asyncio @@ -745,48 +746,6 @@ def datagram_transport(self, address=None): self.addCleanup(close_transport, transport) return transport - def test_read_ready(self): - transport = self.datagram_transport() - - fut = self.loop.create_future() - fut.set_result((b'data', ('0.0.0.0', 1234))) - self.proactor.recv_from.return_value = fut - transport._read_ready() - self.proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) - - self.protocol.datagram_received.assert_called_with( - b'data', ('0.0.0.0', 1234)) - - def test_read_ready_tryagain(self): - transport = self.datagram_transport() - - self.sock.recvfrom.side_effect = BlockingIOError - transport._fatal_error = mock.Mock() - transport._read_ready() - - self.assertFalse(transport._fatal_error.called) - - def test_read_ready_err(self): - transport = self.datagram_transport() - - err = self.sock.recvfrom.side_effect = RuntimeError() - transport._fatal_error = mock.Mock() - transport._read_ready() - - transport._fatal_error.assert_called_with( - err, - 'Fatal read error on datagram transport') - - def test_read_ready_oserr(self): - transport = self.datagram_transport() - - err = self.sock.recvfrom.side_effect = OSError() - transport._fatal_error = mock.Mock() - transport._read_ready() - - self.assertFalse(transport._fatal_error.called) - self.protocol.error_received.assert_called_with(err) - def test_sendto(self): data = b'data' transport = self.datagram_transport() @@ -799,13 +758,17 @@ def test_sendto_bytearray(self): data = bytearray(b'data') transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) - self.assertEqual(transport._buffer, deque([data, ('0.0.0.0', 1234)])) + self.assertTrue(self.proactor.sendto.called) + self.proactor.sendto.assert_called_with( + self.sock, b'data', addr=('0.0.0.0', 1234)) def test_sendto_memoryview(self): data = memoryview(b'data') transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) - self.assertEqual(transport._buffer, deque([data, ('0.0.0.0', 1234)])) + self.assertTrue(self.proactor.sendto.called) + self.proactor.sendto.assert_called_with( + self.sock, b'data', addr=('0.0.0.0', 1234)) def test_sendto_no_data(self): transport = self.datagram_transport() @@ -852,18 +815,6 @@ def test_sendto_buffer_memoryview(self): list(transport._buffer)) self.assertIsInstance(transport._buffer[1][0], bytes) - def test_sendto_tryagain(self): - data = b'data' - - self.sock.sendto.side_effect = BlockingIOError - - transport = self.datagram_transport() - transport.sendto(data, ('0.0.0.0', 12345)) - - self.loop.assert_writer(7, transport._sendto_ready) - self.assertEqual( - [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) - @mock.patch('asyncio.proactor_events.logger') def test_sendto_exception(self, m_log): data = b'data' @@ -927,82 +878,44 @@ def test_sendto_closing(self): transport.sendto(b'data', (1,)) self.assertEqual(transport._conn_lost, 2) - def test_sendto_ready(self): - data = b'data' - self.sock.sendto.return_value = len(data) - - transport = self.datagram_transport() - transport._buffer.append((data, ('0.0.0.0', 12345))) - self.loop._add_writer(7, transport._sendto_ready) - transport._sendto_ready() - self.assertTrue(self.sock.sendto.called) - self.assertEqual( - self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345))) - self.assertFalse(self.loop.writers) - - def test_sendto_ready_closing(self): - data = b'data' - self.sock.send.return_value = len(data) - + def test__loop_writing_closing(self): transport = self.datagram_transport() transport._closing = True - transport._buffer.append((data, ())) - self.loop._add_writer(7, transport._sendto_ready) - transport._sendto_ready() - self.sock.sendto.assert_called_with(data, ()) - self.assertFalse(self.loop.writers) + transport._loop_writing() + self.assertIsNone(transport._write_fut) + test_utils.run_briefly(self.loop) self.sock.close.assert_called_with() self.protocol.connection_lost.assert_called_with(None) - def test_sendto_ready_no_data(self): - transport = self.datagram_transport() - self.loop._add_writer(7, transport._sendto_ready) - transport._sendto_ready() - self.assertFalse(self.sock.sendto.called) - self.assertFalse(self.loop.writers) - - def test_sendto_ready_tryagain(self): - self.sock.sendto.side_effect = BlockingIOError - - transport = self.datagram_transport() - transport._buffer.extend([(b'data1', ()), (b'data2', ())]) - self.loop._add_writer(7, transport._sendto_ready) - transport._sendto_ready() - - self.loop.assert_writer(7, transport._sendto_ready) - self.assertEqual( - [(b'data1', ()), (b'data2', ())], - list(transport._buffer)) - - def test_sendto_ready_exception(self): - err = self.sock.sendto.side_effect = RuntimeError() + def test__loop_writing_exception(self): + err = self.proactor.sendto.side_effect = RuntimeError() transport = self.datagram_transport() transport._fatal_error = mock.Mock() transport._buffer.append((b'data', ())) - transport._sendto_ready() + transport._loop_writing() transport._fatal_error.assert_called_with( err, 'Fatal write error on datagram transport') - def test_sendto_ready_error_received(self): - self.sock.sendto.side_effect = ConnectionRefusedError + def test__loop_writing_error_received(self): + self.proactor.sendto.side_effect = ConnectionRefusedError transport = self.datagram_transport() transport._fatal_error = mock.Mock() transport._buffer.append((b'data', ())) - transport._sendto_ready() + transport._loop_writing() self.assertFalse(transport._fatal_error.called) - def test_sendto_ready_error_received_connection(self): - self.sock.send.side_effect = ConnectionRefusedError + def test__loop_writing_error_received_connection(self): + self.proactor.send.side_effect = ConnectionRefusedError transport = self.datagram_transport(address=('0.0.0.0', 1)) transport._fatal_error = mock.Mock() transport._buffer.append((b'data', ())) - transport._sendto_ready() + transport._loop_writing() self.assertFalse(transport._fatal_error.called) self.assertTrue(self.protocol.error_received.called) @@ -1015,8 +928,8 @@ def test_fatal_error_connected(self, m_exc): self.assertFalse(self.protocol.error_received.called) m_exc.assert_called_with( test_utils.MockPattern( - 'Fatal error on transport\nprotocol:.*\ntransport:.*'), - exc_info=(ConnectionRefusedError, MOCK_ANY, MOCK_ANY)) + 'Fatal error on pipe transport\nprotocol:.*\ntransport:.*'), + exc_info=(ConnectionRefusedError, mock.ANY, mock.ANY)) class BaseProactorEventLoopTests(test_utils.TestCase): @@ -1180,7 +1093,7 @@ def test_datagram_loop_writing(self): def test_datagram_loop_reading(self): tr = self.datagram_transport() tr._loop_reading() - self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096) + self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) self.assertFalse(self.protocol.datagram_received.called) self.assertFalse(self.protocol.error_received.called) close_transport(tr) @@ -1192,7 +1105,7 @@ def test_datagram_loop_reading_data(self): tr = self.datagram_transport() tr._read_fut = res tr._loop_reading(res) - self.loop._proactor.recvfrom.assert_called_with(self.sock, 4096) + self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068)) close_transport(tr) From 475be1f9441e45978452d88ed6b9f27f7bd72f95 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 25 May 2019 15:46:01 +0300 Subject: [PATCH 38/42] Support flow-control for writing operations --- Lib/asyncio/proactor_events.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index f916131ac83121..55b3b53aa49e4e 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -461,6 +461,9 @@ def __init__(self, loop, sock, protocol, address=None, def _set_extra(self, sock): _set_socket_extra(self, sock) + def get_write_buffer_size(self): + return sum(len(data) for data, _ in self._buffer) + def abort(self): self._force_close(None) @@ -490,6 +493,8 @@ def sendto(self, data, addr=None): self._loop_writing() # else: A write operation is already kicked off + self._maybe_pause_protocol() + def _loop_writing(self, fut=None): try: if self._conn_lost: @@ -509,15 +514,19 @@ def _loop_writing(self, fut=None): data, addr = self._buffer.popleft() if self._address is not None: - self._write_fut = self._loop._proactor.send(self._sock, data) + self._write_fut = self._loop._proactor.send(self._sock, + data) else: - self._write_fut = self._loop._proactor.sendto(self._sock, data, addr=addr) + self._write_fut = self._loop._proactor.sendto(self._sock, + data, + addr=addr) except OSError as exc: self._protocol.error_received(exc) except Exception as exc: self._fatal_error(exc, 'Fatal write error on datagram transport') else: self._write_fut.add_done_callback(self._loop_writing) + self._maybe_resume_protocol() def _loop_reading(self, fut=None): data = None @@ -545,9 +554,11 @@ def _loop_reading(self, fut=None): if self._conn_lost: return if self._address is not None: - self._read_fut = self._loop._proactor.recv(self._sock, self.max_size) + self._read_fut = self._loop._proactor.recv(self._sock, + self.max_size) else: - self._read_fut = self._loop._proactor.recvfrom(self._sock, self.max_size) + self._read_fut = self._loop._proactor.recvfrom(self._sock, + self.max_size) except OSError as exc: self._protocol.error_received(exc) except exceptions.CancelledError: From 3000f91af9a5421bba857589fd778e585d207f08 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 25 May 2019 16:07:23 +0300 Subject: [PATCH 39/42] make patchcheck --- Lib/test/test_asyncio/test_proactor_events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index d1afd1b8d41bf3..fc7ab2d32da601 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -751,7 +751,7 @@ def test_sendto(self): transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) self.assertTrue(self.proactor.sendto.called) - self.proactor.sendto.assert_called_with( + self.proactor.sendto.assert_called_with( self.sock, data, addr=('0.0.0.0', 1234)) def test_sendto_bytearray(self): @@ -759,7 +759,7 @@ def test_sendto_bytearray(self): transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) self.assertTrue(self.proactor.sendto.called) - self.proactor.sendto.assert_called_with( + self.proactor.sendto.assert_called_with( self.sock, b'data', addr=('0.0.0.0', 1234)) def test_sendto_memoryview(self): @@ -767,7 +767,7 @@ def test_sendto_memoryview(self): transport = self.datagram_transport() transport.sendto(data, ('0.0.0.0', 1234)) self.assertTrue(self.proactor.sendto.called) - self.proactor.sendto.assert_called_with( + self.proactor.sendto.assert_called_with( self.sock, b'data', addr=('0.0.0.0', 1234)) def test_sendto_no_data(self): From 802b92e7f1236377a4010f567e5b4ce7e899fd0c Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 25 May 2019 16:37:24 +0300 Subject: [PATCH 40/42] Support TYPE_WRITE_TO by tp_traverse --- Modules/overlapped.c | 1 + 1 file changed, 1 insertion(+) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index c46f0dce950259..aad531e478931f 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -1499,6 +1499,7 @@ Overlapped_traverse(OverlappedObject *self, visitproc visit, void *arg) Py_VISIT(self->allocated_buffer); break; case TYPE_WRITE: + case TYPE_WRITE_TO: case TYPE_READINTO: if (self->user_buffer.obj) { Py_VISIT(&self->user_buffer.obj); From 2b8e51ea0b0e5f89f88c8026d62e6a56ba9cf137 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 26 May 2019 10:38:08 +0300 Subject: [PATCH 41/42] Update Lib/asyncio/proactor_events.py Co-Authored-By: Xtreak --- Lib/asyncio/proactor_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 55b3b53aa49e4e..ba9cf38713b0f4 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -469,7 +469,7 @@ def abort(self): def sendto(self, data, addr=None): if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be byte-ish (%r)', + raise TypeError('data argument must be bytes-like object (%r)', type(data)) if not data: From fdd800105dfdd6b510e35b7458731639d0ce202e Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 28 May 2019 12:30:34 +0300 Subject: [PATCH 42/42] Reflect in tests the fact that OSError is not loggeg anymore --- Lib/test/test_asyncio/test_proactor_events.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index fc7ab2d32da601..2e9995d3280736 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -926,10 +926,7 @@ def test_fatal_error_connected(self, m_exc): err = ConnectionRefusedError() transport._fatal_error(err) self.assertFalse(self.protocol.error_received.called) - m_exc.assert_called_with( - test_utils.MockPattern( - 'Fatal error on pipe transport\nprotocol:.*\ntransport:.*'), - exc_info=(ConnectionRefusedError, mock.ANY, mock.ANY)) + m_exc.assert_not_called() class BaseProactorEventLoopTests(test_utils.TestCase):