Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 24 additions & 30 deletions Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,8 @@ Creating connections
socket type :py:data:`~socket.SOCK_STREAM`. *protocol_factory* must be a
callable returning a :ref:`protocol <asyncio-protocol>` instance.

This method is a :ref:`coroutine <coroutine>` which will try to
establish the connection in the background. When successful, the
coroutine returns a ``(transport, protocol)`` pair.
This method will try to establish the connection in the background.
When successful, it returns a ``(transport, protocol)`` pair.

The chronological synopsis of the underlying operation is as follows:

Expand Down Expand Up @@ -344,9 +343,8 @@ Creating connections
:py:data:`~socket.SOCK_DGRAM`. *protocol_factory* must be a
callable returning a :ref:`protocol <asyncio-protocol>` instance.

This method is a :ref:`coroutine <coroutine>` which will try to
establish the connection in the background. When successful, the
coroutine returns a ``(transport, protocol)`` pair.
This method will try to establish the connection in the background.
When successful, the it returns a ``(transport, protocol)`` pair.

Options changing how the connection is created:

Expand Down Expand Up @@ -395,9 +393,8 @@ Creating connections
family is used to communicate between processes on the same machine
efficiently.

This method is a :ref:`coroutine <coroutine>` which will try to
establish the connection in the background. When successful, the
coroutine returns a ``(transport, protocol)`` pair.
This method will try to establish the connection in the background.
When successful, the it returns a ``(transport, protocol)`` pair.

*path* is the name of a UNIX domain socket, and is required unless a *sock*
parameter is specified. Abstract UNIX sockets, :class:`str`,
Expand Down Expand Up @@ -459,8 +456,6 @@ Creating listening connections
set this flag when being created. This option is not supported on
Windows.

This method is a :ref:`coroutine <coroutine>`.

.. versionchanged:: 3.5

On Windows with :class:`ProactorEventLoop`, SSL/TLS is now supported.
Expand All @@ -484,8 +479,6 @@ Creating listening connections
parameter is specified. Abstract UNIX sockets, :class:`str`,
:class:`bytes`, and :class:`~pathlib.Path` paths are supported.

This method is a :ref:`coroutine <coroutine>`.

Availability: UNIX.

.. versionchanged:: 3.7
Expand All @@ -507,8 +500,7 @@ Creating listening connections
* *ssl* can be set to an :class:`~ssl.SSLContext` to enable SSL over the
accepted connections.

This method is a :ref:`coroutine <coroutine>`. When completed, the
coroutine returns a ``(transport, protocol)`` pair.
When completed it returns a ``(transport, protocol)`` pair.

.. versionadded:: 3.5.3

Expand Down Expand Up @@ -565,7 +557,10 @@ Low-level socket operations
With :class:`SelectorEventLoop` event loop, the socket *sock* must be
non-blocking.

This method is a :ref:`coroutine <coroutine>`.
.. versionchanged:: 3.7
Even though the method was always documented as a coroutine
method, before Python 3.7 it returned a :class:`Future`.
Since Python 3.7, this is an ``async def`` method.

.. coroutinemethod:: AbstractEventLoop.sock_recv_into(sock, buf)

Expand All @@ -578,8 +573,6 @@ Low-level socket operations
With :class:`SelectorEventLoop` event loop, the socket *sock* must be
non-blocking.

This method is a :ref:`coroutine <coroutine>`.

.. versionadded:: 3.7

.. coroutinemethod:: AbstractEventLoop.sock_sendall(sock, data)
Expand All @@ -596,7 +589,10 @@ Low-level socket operations
With :class:`SelectorEventLoop` event loop, the socket *sock* must be
non-blocking.

This method is a :ref:`coroutine <coroutine>`.
.. versionchanged:: 3.7
Even though the method was always documented as a coroutine
method, before Python 3.7 it returned an :class:`Future`.
Since Python 3.7, this is an ``async def`` method.

.. coroutinemethod:: AbstractEventLoop.sock_connect(sock, address)

Expand All @@ -606,8 +602,6 @@ Low-level socket operations
With :class:`SelectorEventLoop` event loop, the socket *sock* must be
non-blocking.

This method is a :ref:`coroutine <coroutine>`.

.. versionchanged:: 3.5.2
``address`` no longer needs to be resolved. ``sock_connect``
will try to check if the *address* is already resolved by calling
Expand All @@ -634,7 +628,10 @@ Low-level socket operations

The socket *sock* must be non-blocking.

This method is a :ref:`coroutine <coroutine>`.
.. versionchanged:: 3.7
Even though the method was always documented as a coroutine
method, before Python 3.7 it returned a :class:`Future`.
Since Python 3.7, this is an ``async def`` method.

.. seealso::

Expand Down Expand Up @@ -673,8 +670,6 @@ Use :class:`ProactorEventLoop` to support pipes on Windows.
With :class:`SelectorEventLoop` event loop, the *pipe* is set to
non-blocking mode.

This method is a :ref:`coroutine <coroutine>`.

.. coroutinemethod:: AbstractEventLoop.connect_write_pipe(protocol_factory, pipe)

Register write pipe in eventloop.
Expand All @@ -687,8 +682,6 @@ Use :class:`ProactorEventLoop` to support pipes on Windows.
With :class:`SelectorEventLoop` event loop, the *pipe* is set to
non-blocking mode.

This method is a :ref:`coroutine <coroutine>`.

.. seealso::

The :meth:`AbstractEventLoop.subprocess_exec` and
Expand Down Expand Up @@ -738,15 +731,18 @@ pool of processes). By default, an event loop uses a thread pool executor
:ref:`Use functools.partial to pass keywords to the *func*
<asyncio-pass-keywords>`.

This method is a :ref:`coroutine <coroutine>`.

.. versionchanged:: 3.5.3
:meth:`BaseEventLoop.run_in_executor` no longer configures the
``max_workers`` of the thread pool executor it creates, instead
leaving it up to the thread pool executor
(:class:`~concurrent.futures.ThreadPoolExecutor`) to set the
default.

.. versionchanged:: 3.7
Even though the method was always documented as a coroutine
method, before Python 3.7 it returned a :class:`Future`.
Since Python 3.7, this is an ``async def`` method.

.. method:: AbstractEventLoop.set_default_executor(executor)

Set the default executor used by :meth:`run_in_executor`.
Expand Down Expand Up @@ -857,8 +853,6 @@ Server

Wait until the :meth:`close` method completes.

This method is a :ref:`coroutine <coroutine>`.

.. attribute:: sockets

List of :class:`socket.socket` objects the server is listening to, or
Expand Down
85 changes: 39 additions & 46 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,6 @@ def _ipaddr_info(host, port, family, type, proto):
return None


def _ensure_resolved(address, *, family=0, type=socket.SOCK_STREAM, proto=0,
flags=0, loop):
host, port = address[:2]
info = _ipaddr_info(host, port, family, type, proto)
if info is not None:
# "host" is already a resolved IP.
fut = loop.create_future()
fut.set_result([info])
return fut
else:
return loop.getaddrinfo(host, port, family=family, type=type,
proto=proto, flags=flags)


def _run_until_complete_cb(fut):
exc = fut._exception
if isinstance(exc, BaseException) and not isinstance(exc, Exception):
Expand Down Expand Up @@ -614,7 +600,7 @@ def call_soon_threadsafe(self, callback, *args):
self._write_to_self()
return handle

def run_in_executor(self, executor, func, *args):
async def run_in_executor(self, executor, func, *args):
self._check_closed()
if self._debug:
self._check_callback(func, 'run_in_executor')
Expand All @@ -623,7 +609,8 @@ def run_in_executor(self, executor, func, *args):
if executor is None:
executor = concurrent.futures.ThreadPoolExecutor()
self._default_executor = executor
return futures.wrap_future(executor.submit(func, *args), loop=self)
return await futures.wrap_future(
executor.submit(func, *args), loop=self)

def set_default_executor(self, executor):
self._default_executor = executor
Expand Down Expand Up @@ -652,17 +639,19 @@ def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
logger.debug(msg)
return addrinfo

def getaddrinfo(self, host, port, *,
family=0, type=0, proto=0, flags=0):
async def getaddrinfo(self, host, port, *,
family=0, type=0, proto=0, flags=0):
if self._debug:
return self.run_in_executor(None, self._getaddrinfo_debug,
host, port, family, type, proto, flags)
getaddr_func = self._getaddrinfo_debug
else:
return self.run_in_executor(None, socket.getaddrinfo,
host, port, family, type, proto, flags)
getaddr_func = socket.getaddrinfo

def getnameinfo(self, sockaddr, flags=0):
return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
return await self.run_in_executor(
None, getaddr_func, host, port, family, type, proto, flags)

async def getnameinfo(self, sockaddr, flags=0):
return await self.run_in_executor(
None, socket.getnameinfo, sockaddr, flags)

async def create_connection(self, protocol_factory, host=None, port=None,
*, ssl=None, family=0,
Expand Down Expand Up @@ -703,25 +692,17 @@ async def create_connection(self, protocol_factory, host=None, port=None,
raise ValueError(
'host/port and sock can not be specified at the same time')

f1 = _ensure_resolved((host, port), family=family,
type=socket.SOCK_STREAM, proto=proto,
flags=flags, loop=self)
fs = [f1]
if local_addr is not None:
f2 = _ensure_resolved(local_addr, family=family,
type=socket.SOCK_STREAM, proto=proto,
flags=flags, loop=self)
fs.append(f2)
else:
f2 = None

await tasks.wait(fs, loop=self)

infos = f1.result()
infos = await self._ensure_resolved(
(host, port), family=family,
type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
if not infos:
raise OSError('getaddrinfo() returned empty list')
if f2 is not None:
laddr_infos = f2.result()

if local_addr is not None:
laddr_infos = await self._ensure_resolved(
local_addr, family=family,
type=socket.SOCK_STREAM, proto=proto,
flags=flags, loop=self)
if not laddr_infos:
raise OSError('getaddrinfo() returned empty list')

Expand All @@ -730,7 +711,7 @@ async def create_connection(self, protocol_factory, host=None, port=None,
try:
sock = socket.socket(family=family, type=type, proto=proto)
sock.setblocking(False)
if f2 is not None:
if local_addr is not None:
for _, _, _, _, laddr in laddr_infos:
try:
sock.bind(laddr)
Expand Down Expand Up @@ -863,7 +844,7 @@ async def create_datagram_endpoint(self, protocol_factory,
assert isinstance(addr, tuple) and len(addr) == 2, (
'2-tuple is expected')

infos = await _ensure_resolved(
infos = await self._ensure_resolved(
addr, family=family, type=socket.SOCK_DGRAM,
proto=proto, flags=flags, loop=self)
if not infos:
Expand Down Expand Up @@ -946,10 +927,22 @@ async def create_datagram_endpoint(self, protocol_factory,

return transport, protocol

async def _ensure_resolved(self, address, *,
family=0, type=socket.SOCK_STREAM,
proto=0, flags=0, loop):
host, port = address[:2]
info = _ipaddr_info(host, port, family, type, proto)
if info is not None:
# "host" is already a resolved IP.
return [info]
else:
return await loop.getaddrinfo(host, port, family=family, type=type,
proto=proto, flags=flags)

async def _create_server_getaddrinfo(self, host, port, family, flags):
infos = await _ensure_resolved((host, port), family=family,
type=socket.SOCK_STREAM,
flags=flags, loop=self)
infos = await self._ensure_resolved((host, port), family=family,
type=socket.SOCK_STREAM,
flags=flags, loop=self)
if not infos:
raise OSError(f'getaddrinfo({host!r}) returned empty list')
return infos
Expand Down
20 changes: 10 additions & 10 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,20 +432,20 @@ def close(self):
# Close the event loop
super().close()

def sock_recv(self, sock, n):
return self._proactor.recv(sock, n)
async def sock_recv(self, sock, n):
return await self._proactor.recv(sock, n)

def sock_recv_into(self, sock, buf):
return self._proactor.recv_into(sock, buf)
async def sock_recv_into(self, sock, buf):
return await self._proactor.recv_into(sock, buf)

def sock_sendall(self, sock, data):
return self._proactor.send(sock, data)
async def sock_sendall(self, sock, data):
return await self._proactor.send(sock, data)

def sock_connect(self, sock, address):
return self._proactor.connect(sock, address)
async def sock_connect(self, sock, address):
return await self._proactor.connect(sock, address)

def sock_accept(self, sock):
return self._proactor.accept(sock)
async def sock_accept(self, sock):
return await self._proactor.accept(sock)

def _close_self_pipe(self):
if self._self_reading_future is not None:
Expand Down
Loading