Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
e9084a1
bpo-29883: Add UDP to Windows Proactor Event Loop
ameily Mar 25, 2017
1c72ea5
fix AppVeyor issues
ameily Dec 16, 2017
668d6d7
fixed UDP proactor tests
ameily Dec 16, 2017
05a704c
add missing cast to PyCFunction
ameily Dec 16, 2017
685ed0f
implement extra data for proactor UDP transport
ameily Dec 16, 2017
038739e
enable UDP proactor unit tests on Windows
ameily Dec 16, 2017
ae14118
fix bad rebase of datagram proactor tests
ameily Sep 15, 2018
93ce3f8
add news item for bpo29883
ameily Sep 15, 2018
781e2ef
fix bad rebase with missing finish_recv() in recv_into()
ameily Sep 15, 2018
8761fcd
change WSAConnect to a module method
ameily Sep 15, 2018
5e3d4d3
PEP 7 fixes and address PR feedback
ameily Sep 22, 2018
d25833d
fix build error
ameily Sep 22, 2018
16d2ce7
simplify calls to super()
ameily Sep 26, 2018
da40f28
address PR feedback
ameily Oct 3, 2018
2962f5c
Update asyncio-eventloop.rst
asvetlov Nov 8, 2018
ce73a32
address feedback from PR
ameily Dec 28, 2018
4211be9
fix bad rebase
ameily Mar 30, 2019
504ae83
address PR feedback
ameily Apr 5, 2019
58e9f8d
fix memory leak
ameily Apr 12, 2019
0c935b0
fix memory leak when receiving on UDP endpoint
ameily May 2, 2019
7cd4165
Merge branch 'fix-issue-29883' of git://github.com/ameily/cpython int…
asvetlov May 20, 2019
bb0c1d2
Use None address if recvfrom is unexpectedly finished
asvetlov May 20, 2019
43cd944
Use explicit check for None in self._address
asvetlov May 20, 2019
5eabe05
Return (addr, host, flow_info, scope_id) in unparse_address for IPv6
asvetlov May 20, 2019
d5fed18
Revert test
asvetlov May 20, 2019
22bc146
Merge branch 'master' into asyncio-proactor-udp
asvetlov May 20, 2019
6e50567
Fix spaces
asvetlov May 20, 2019
45e2b03
Fix spaces
asvetlov May 20, 2019
87679d7
Fix styling
asvetlov May 20, 2019
f3dab42
buffer -> allocated_buffer
asvetlov May 20, 2019
b8ac00a
Merge branch 'asyncio-proactor-udp' of github.com:asvetlov/cpython in…
asvetlov May 20, 2019
c17a377
Postfix
asvetlov May 20, 2019
2b44c36
Fix style
asvetlov May 20, 2019
2bfb099
Support TYPE_READ_FROM in tp_traverse slot
asvetlov May 20, 2019
d033f3e
Fix another memory leak
asvetlov May 20, 2019
3503526
Fix ref count
asvetlov May 22, 2019
30b588d
Merge branch 'master' into asyncio-proactor-udp
asvetlov May 22, 2019
227a390
Add datagram mocking tests
asvetlov May 23, 2019
eb474a1
Work on mock tests
asvetlov May 23, 2019
431805d
Fix tests
asvetlov May 23, 2019
7d5f910
Better test coverage
asvetlov May 25, 2019
96390b9
Merge branch 'master' into asyncio-proactor-udp
asvetlov May 25, 2019
475be1f
Support flow-control for writing operations
asvetlov May 25, 2019
3000f91
make patchcheck
asvetlov May 25, 2019
802b92e
Support TYPE_WRITE_TO by tp_traverse
asvetlov May 25, 2019
2b8e51e
Update Lib/asyncio/proactor_events.py
asvetlov May 26, 2019
ae4cd69
Merge branch 'master' into asyncio-proactor-udp
asvetlov May 28, 2019
927a429
Merge branch 'asyncio-proactor-udp' of github.com:asvetlov/cpython in…
asvetlov May 28, 2019
fdd8001
Reflect in tests the fact that OSError is not loggeg anymore
asvetlov May 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -504,15 +504,16 @@ Opening network connections
transport. If specified, *local_addr* and *remote_addr* should be omitted
(must be :const:`None`).

On Windows, with :class:`ProactorEventLoop`, this method is not supported.

See :ref:`UDP echo client protocol <asyncio-udp-echo-client-protocol>` and
:ref:`UDP echo server protocol <asyncio-udp-echo-server-protocol>` examples.

.. versionchanged:: 3.4.4
The *family*, *proto*, *flags*, *reuse_address*, *reuse_port,
*allow_broadcast*, and *sock* parameters were added.

.. versionchanged:: 3.8
Added support for Windows.

.. coroutinemethod:: loop.create_unix_connection(protocol_factory, \
path=None, \*, ssl=None, sock=None, \
server_hostname=None, ssl_handshake_timeout=None)
Expand Down
3 changes: 0 additions & 3 deletions Doc/library/asyncio-platforms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ All event loops on Windows do not support the following methods:

:class:`ProactorEventLoop` has the following limitations:

* The :meth:`loop.create_datagram_endpoint` method
is not supported.

* The :meth:`loop.add_reader` and :meth:`loop.add_writer`
methods are not supported.

Expand Down
169 changes: 153 additions & 16 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import socket
import warnings
import signal
import collections

from . import base_events
from . import constants
Expand All @@ -23,6 +24,24 @@
from .log import logger


def _set_socket_extra(transport, sock):
transport._extra['socket'] = trsock.TransportSocket(sock)

try:
transport._extra['sockname'] = sock.getsockname()
except socket.error:
if transport._loop.get_debug():
logger.warning(
"getsockname() failed on %r", sock, exc_info=True)

if 'peername' not in transport._extra:
try:
transport._extra['peername'] = sock.getpeername()
except socket.error:
# UDP sockets may not have a peer name
transport._extra['peername'] = None


class _ProactorBasePipeTransport(transports._FlowControlMixin,
transports.BaseTransport):
"""Base class for pipe and socket transports."""
Expand Down Expand Up @@ -430,6 +449,134 @@ def _pipe_closed(self, fut):
self.close()


class _ProactorDatagramTransport(_ProactorBasePipeTransport):
max_size = 256 * 1024
def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
self._address = address
self._empty_waiter = None
# We don't need to call _protocol.connection_made() since our base
# constructor does it for us.
super().__init__(loop, sock, protocol, waiter=waiter, extra=extra)

# The base constructor sets _buffer = None, so we set it here
self._buffer = collections.deque()
self._loop.call_soon(self._loop_reading)

def _set_extra(self, sock):
_set_socket_extra(self, sock)

def get_write_buffer_size(self):
return sum(len(data) for data, _ in self._buffer)

def abort(self):
self._force_close(None)

def sendto(self, data, addr=None):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be bytes-like object (%r)',
type(data))

if not data:
return

if self._address is not None and addr not in (None, self._address):
raise ValueError(
f'Invalid address: must be None or {self._address}')

if self._conn_lost and self._address:
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
logger.warning('socket.sendto() raised exception.')
self._conn_lost += 1
return

# Ensure that what we buffer is immutable.
self._buffer.append((bytes(data), addr))

if self._write_fut is None:
# No current write operations are active, kick one off
self._loop_writing()
# else: A write operation is already kicked off

self._maybe_pause_protocol()

def _loop_writing(self, fut=None):
try:
if self._conn_lost:
return

assert fut is self._write_fut
self._write_fut = None
if fut:
# We are in a _loop_writing() done callback, get the result
fut.result()

if not self._buffer or (self._conn_lost and self._address):
# The connection has been closed
if self._closing:
self._loop.call_soon(self._call_connection_lost, None)
return

data, addr = self._buffer.popleft()
if self._address is not None:
self._write_fut = self._loop._proactor.send(self._sock,
data)
else:
self._write_fut = self._loop._proactor.sendto(self._sock,
data,
addr=addr)
except OSError as exc:
self._protocol.error_received(exc)
except Exception as exc:
self._fatal_error(exc, 'Fatal write error on datagram transport')
else:
self._write_fut.add_done_callback(self._loop_writing)
self._maybe_resume_protocol()

def _loop_reading(self, fut=None):
data = None
try:
if self._conn_lost:
return

assert self._read_fut is fut or (self._read_fut is None and
self._closing)

self._read_fut = None
if fut is not None:
res = fut.result()

if self._closing:
# since close() has been called we ignore any read data
data = None
return

if self._address is not None:
data, addr = res, self._address
else:
data, addr = res

if self._conn_lost:
return
if self._address is not None:
self._read_fut = self._loop._proactor.recv(self._sock,
self.max_size)
else:
self._read_fut = self._loop._proactor.recvfrom(self._sock,
self.max_size)
except OSError as exc:
self._protocol.error_received(exc)
except exceptions.CancelledError:
if not self._closing:
raise
else:
if self._read_fut is not None:
self._read_fut.add_done_callback(self._loop_reading)
finally:
if data:
self._protocol.datagram_received(data, addr)


class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
_ProactorBaseWritePipeTransport,
transports.Transport):
Expand All @@ -455,22 +602,7 @@ def __init__(self, loop, sock, protocol, waiter=None,
base_events._set_nodelay(sock)

def _set_extra(self, sock):
self._extra['socket'] = trsock.TransportSocket(sock)

try:
self._extra['sockname'] = sock.getsockname()
except (socket.error, AttributeError):
if self._loop.get_debug():
logger.warning(
"getsockname() failed on %r", sock, exc_info=True)

if 'peername' not in self._extra:
try:
self._extra['peername'] = sock.getpeername()
except (socket.error, AttributeError):
if self._loop.get_debug():
logger.warning("getpeername() failed on %r",
sock, exc_info=True)
_set_socket_extra(self, sock)

def can_write_eof(self):
return True
Expand Down Expand Up @@ -515,6 +647,11 @@ def _make_ssl_transport(
extra=extra, server=server)
return ssl_protocol._app_transport

def _make_datagram_transport(self, sock, protocol,
address=None, waiter=None, extra=None):
return _ProactorDatagramTransport(self, sock, protocol, address,
waiter, extra)

def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
extra=None):
return _ProactorDuplexPipeTransport(self,
Expand Down
46 changes: 46 additions & 0 deletions Lib/asyncio/windows_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,44 @@ def finish_recv(trans, key, ov):

return self._register(ov, conn, finish_recv)

def recvfrom(self, conn, nbytes, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
try:
ov.WSARecvFrom(conn.fileno(), nbytes, flags)
except BrokenPipeError:
return self._result((b'', None))

def finish_recv(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
_overlapped.ERROR_OPERATION_ABORTED):
raise ConnectionResetError(*exc.args)
else:
raise

return self._register(ov, conn, finish_recv)

def sendto(self, conn, buf, flags=0, addr=None):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)

ov.WSASendTo(conn.fileno(), buf, flags, addr)

def finish_send(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
_overlapped.ERROR_OPERATION_ABORTED):
raise ConnectionResetError(*exc.args)
else:
raise

return self._register(ov, conn, finish_send)

def send(self, conn, buf, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
Expand Down Expand Up @@ -532,6 +570,14 @@ async def accept_coro(future, conn):
return future

def connect(self, conn, address):
if conn.type == socket.SOCK_DGRAM:
# WSAConnect will complete immediately for UDP sockets so we don't
# need to register any IOCP operation
_overlapped.WSAConnect(conn.fileno(), address)
fut = self._loop.create_future()
fut.set_result(None)
return fut

self._register_with_iocp(conn)
# The socket needs to be locally bound before we call ConnectEx().
try:
Expand Down
9 changes: 0 additions & 9 deletions Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,11 +1249,6 @@ def datagram_received(self, data, addr):
server.transport.close()

def test_create_datagram_endpoint_sock(self):
if (sys.platform == 'win32' and
isinstance(self.loop, proactor_events.BaseProactorEventLoop)):
raise unittest.SkipTest(
'UDP is not supported with proactor event loops')

sock = None
local_address = ('127.0.0.1', 0)
infos = self.loop.run_until_complete(
Expand Down Expand Up @@ -2004,10 +1999,6 @@ def test_writer_callback(self):
def test_writer_callback_cancel(self):
raise unittest.SkipTest("IocpEventLoop does not have add_writer()")

def test_create_datagram_endpoint(self):
raise unittest.SkipTest(
"IocpEventLoop does not have create_datagram_endpoint()")

def test_remove_fds_after_closing(self):
raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
else:
Expand Down
Loading