Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 82 additions & 5 deletions Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ Creating connections
Creating listening connections
------------------------------

.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None)
.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to
*host* and *port*.
Expand Down Expand Up @@ -472,9 +472,15 @@ Creating listening connections
for the SSL handshake to complete before aborting the connection.
``10.0`` seconds if ``None`` (default).

* *start_serving* set to ``True`` (the default) causes the created server
to start accepting connections immediately. When set to ``False``,
the user should await on :meth:`Server.start_serving` or
:meth:`Server.serve_forever` to make the server to start accepting
connections.

.. versionadded:: 3.7

The *ssl_handshake_timeout* parameter.
*ssl_handshake_timeout* and *start_serving* parameters.

.. versionchanged:: 3.5

Expand All @@ -490,7 +496,7 @@ Creating listening connections
The *host* parameter can now be a sequence of strings.


.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None)
.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Similar to :meth:`AbstractEventLoop.create_server`, but specific to the
socket family :py:data:`~socket.AF_UNIX`.
Expand Down Expand Up @@ -929,8 +935,26 @@ Server

Server listening on sockets.

Object created by the :meth:`AbstractEventLoop.create_server` method and the
:func:`start_server` function. Don't instantiate the class directly.
Object created by :meth:`AbstractEventLoop.create_server`,
:meth:`AbstractEventLoop.create_unix_server`, :func:`start_server`,
and :func:`start_unix_server` functions. Don't instantiate the class
directly.

*Server* objects are asynchronous context managers. When used in an
``async with`` statement, it's guaranteed that the Server object is
closed and not accepting new connections when the ``async with``
statement is completed::

srv = await loop.create_server(...)

async with srv:
# some code

# At this point, srv is closed and no longer accepts new connections.


.. versionchanged:: 3.7
Server object is an asynchronous context manager since Python 3.7.

.. method:: close()

Expand All @@ -949,6 +973,54 @@ Server

.. versionadded:: 3.7

.. coroutinemethod:: start_serving()

Start accepting connections.

This method is idempotent, so it can be called when
the server is already being serving.

The new *start_serving* keyword-only parameter to
:meth:`AbstractEventLoop.create_server` and
:meth:`asyncio.start_server` allows to create a Server object
that is not accepting connections right away. In which case
this method, or :meth:`Server.serve_forever` can be used
to make the Server object to start accepting connections.

.. versionadded:: 3.7

.. coroutinemethod:: serve_forever()

Start accepting connections until the coroutine is cancelled.
Cancellation of ``serve_forever`` task causes the server
to be closed.

This method can be called if the server is already accepting
connections. Only one ``serve_forever`` task can exist per
one *Server* object.

Example::

async def client_connected(reader, writer):
# Communicate with the client with
# reader/writer streams. For example:
await reader.readline()

async def main(host, port):
srv = await asyncio.start_server(
client_connected, host, port)
await loop.serve_forever()

asyncio.run(main('127.0.0.1', 0))

.. versionadded:: 3.7

.. method:: is_serving()

Return ``True`` if the server is accepting new connections.

.. versionadded:: 3.7

.. coroutinemethod:: wait_closed()

Wait until the :meth:`close` method completes.
Expand All @@ -958,6 +1030,11 @@ Server
List of :class:`socket.socket` objects the server is listening to, or
``None`` if the server is closed.

.. versionchanged:: 3.7
Prior to Python 3.7 ``Server.sockets`` used to return the
internal list of server's sockets directly. In 3.7 a copy
of that list is returned.


Handle
------
Expand Down
102 changes: 82 additions & 20 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,47 +157,106 @@ def _run_until_complete_cb(fut):

class Server(events.AbstractServer):

def __init__(self, loop, sockets):
def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
ssl_handshake_timeout):
self._loop = loop
self.sockets = sockets
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backward incompatible change.
I pretty sure 99.9% of users newer modified the attribute but we should have a big warning about it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no point in modifying this attribute ever, but I agree, it needs to be documented. Will fix in the documentation.

self._sockets = sockets
self._active_count = 0
self._waiters = []
self._protocol_factory = protocol_factory
self._backlog = backlog
self._ssl_context = ssl_context
self._ssl_handshake_timeout = ssl_handshake_timeout
self._serving = False
self._serving_forever_fut = None

def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'

def _attach(self):
assert self.sockets is not None
assert self._sockets is not None
self._active_count += 1

def _detach(self):
assert self._active_count > 0
self._active_count -= 1
if self._active_count == 0 and self.sockets is None:
if self._active_count == 0 and self._sockets is None:
self._wakeup()

def _wakeup(self):
waiters = self._waiters
self._waiters = None
for waiter in waiters:
if not waiter.done():
waiter.set_result(waiter)

def _start_serving(self):
if self._serving:
return
self._serving = True
for sock in self._sockets:
sock.listen(self._backlog)
self._loop._start_serving(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is loop's private method call.
For me it's a bad smell.
Can we implement the feature with public API calls only?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we can't, without re-architecturing half of create server machinery and exposing a new event loop public API. I have no interest in doing that.

And I don't see why this is a problem -- Loop and Server are designed to work together. Loop creates Server objects and knows everything about them. This is exactly the same relationship as between Transports and Loops.

Also you can't use Server object from uvloop and vice versa, so I don't see any problem here either.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I don't insist. We have many tightly coupled classes in asyncio already.

self._protocol_factory, sock, self._ssl_context,
self, self._backlog, self._ssl_handshake_timeout)

def get_loop(self):
return self._loop

def is_serving(self):
return self._serving

@property
def sockets(self):
if self._sockets is None:
return []
return list(self._sockets)

def close(self):
sockets = self.sockets
sockets = self._sockets
if sockets is None:
return
self.sockets = None
self._sockets = None

for sock in sockets:
self._loop._stop_serving(sock)

self._serving = False

if (self._serving_forever_fut is not None and
not self._serving_forever_fut.done()):
self._serving_forever_fut.cancel()
self._serving_forever_fut = None

if self._active_count == 0:
self._wakeup()

def get_loop(self):
return self._loop
async def start_serving(self):
self._start_serving()

def _wakeup(self):
waiters = self._waiters
self._waiters = None
for waiter in waiters:
if not waiter.done():
waiter.set_result(waiter)
async def serve_forever(self):
if self._serving_forever_fut is not None:
raise RuntimeError(
f'server {self!r} is already being awaited on serve_forever()')
if self._sockets is None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should check self._serving attribute too maybe.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we document this method as idempotent as related to server already accepting connections. I.e. it's fine to do this:

await server.start_serving()
await server.serve_forever()

The reason for this design is backwards-compatibility: loop.create_server by default returns a Server object that's already serving! So server.serve_forever() must be idempotent.

I found the utility in server.start_serving() when I was writing unittests. Sometimes we need to deterministically know when a server just started to listen, otherwise, for instance, it would be impossible for me to write a working loop.create_unix_server unittest.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanation

raise RuntimeError(f'server {self!r} is closed')

self._start_serving()
self._serving_forever_fut = self._loop.create_future()

try:
await self._serving_forever_fut
except futures.CancelledError:
try:
self.close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serve_forever() coroutine calls self.close() but doesn't wait for actual closing (await self.wait_closed()).
It is strange and complicated the usage: the proper code should always be:

try:
    await server.serve_forever()
except Exception:
     await server.wait_closed()
     raise

Moreover server.sockets contains listening sockets only, long running accepted connections should be closed separately. I think it should be mentioned in documentation. Unfortunately I see no way how to fix the problem quickly (and the fix is out of scope of the PR, sure).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serve_forever() coroutine calls self.close() but doesn't wait for actual closing (await self.wait_closed()).

OK, will do that.

Moreover server.sockets contains listening sockets only, long running accepted connections should be closed separately. I think it should be mentioned in documentation. Unfortunately I see no way how to fix the problem quickly (and the fix is out of scope of the PR, sure).

Yes, I agree :( Need to fix that in 3.8.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss it later. Actually at least in aiohttp we need stop listening first (server.close() does the job perfectly) and close all already accepted connections in controlled way (with signals for graceful shutdown and close timeout).

await self.wait_closed()
finally:
raise
finally:
self._serving_forever_fut = None

async def wait_closed(self):
if self.sockets is None or self._waiters is None:
if self._sockets is None or self._waiters is None:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
Expand Down Expand Up @@ -1059,7 +1118,8 @@ async def create_server(
ssl=None,
reuse_address=None,
reuse_port=None,
ssl_handshake_timeout=None):
ssl_handshake_timeout=None,
start_serving=True):
"""Create a TCP server.

The host parameter can be a string, in that case the TCP server is
Expand Down Expand Up @@ -1149,12 +1209,14 @@ async def create_server(
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
sockets = [sock]

server = Server(self, sockets)
for sock in sockets:
sock.listen(backlog)
sock.setblocking(False)
self._start_serving(protocol_factory, sock, ssl, server, backlog,
ssl_handshake_timeout)

server = Server(self, sockets, protocol_factory,
ssl, backlog, ssl_handshake_timeout)
if start_serving:
server._start_serving()

if self._debug:
logger.info("%r is serving", server)
return server
Expand Down
48 changes: 43 additions & 5 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,39 @@ def close(self):
"""Stop serving. This leaves existing connections open."""
raise NotImplementedError

def get_loop(self):
"""Get the event loop the Server object is attached to."""
raise NotImplementedError

def is_serving(self):
"""Return True if the server is accepting connections."""
raise NotImplementedError

async def start_serving(self):
"""Start accepting connections.

This method is idempotent, so it can be called when
the server is already being serving.
"""
raise NotImplementedError

async def serve_forever(self):
"""Start accepting connections until the coroutine is cancelled.

The server is closed when the coroutine is cancelled.
"""
raise NotImplementedError

async def wait_closed(self):
"""Coroutine to wait until service is closed."""
raise NotImplementedError

def get_loop(self):
""" Get the event loop the Server object is attached to."""
raise NotImplementedError
async def __aenter__(self):
return self

async def __aexit__(self, *exc):
self.close()
await self.wait_closed()


class AbstractEventLoop:
Expand Down Expand Up @@ -279,7 +305,8 @@ async def create_server(
*, family=socket.AF_UNSPEC,
flags=socket.AI_PASSIVE, sock=None, backlog=100,
ssl=None, reuse_address=None, reuse_port=None,
ssl_handshake_timeout=None):
ssl_handshake_timeout=None,
start_serving=True):
"""A coroutine which creates a TCP server bound to host and port.

The return value is a Server object which can be used to stop
Expand Down Expand Up @@ -319,6 +346,11 @@ async def create_server(
will wait for completion of the SSL handshake before aborting the
connection. Default is 10s, longer timeouts may increase vulnerability
to DoS attacks (see https://support.f5.com/csp/article/K13834)

start_serving set to True (default) causes the created server
to start accepting connections immediately. When set to False,
the user should await Server.start_serving() or Server.serve_forever()
to make the server to start accepting connections.
"""
raise NotImplementedError

Expand All @@ -343,7 +375,8 @@ async def create_unix_connection(
async def create_unix_server(
self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None,
ssl_handshake_timeout=None):
ssl_handshake_timeout=None,
start_serving=True):
"""A coroutine which creates a UNIX Domain Socket server.

The return value is a Server object, which can be used to stop
Expand All @@ -363,6 +396,11 @@ async def create_unix_server(

ssl_handshake_timeout is the time in seconds that an SSL server
will wait for the SSL handshake to complete (defaults to 10s).

start_serving set to True (default) causes the created server
to start accepting connections immediately. When set to False,
the user should await Server.start_serving() or Server.serve_forever()
to make the server to start accepting connections.
"""
raise NotImplementedError

Expand Down
12 changes: 7 additions & 5 deletions Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ async def create_unix_connection(
async def create_unix_server(
self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None,
ssl_handshake_timeout=None):
ssl_handshake_timeout=None,
start_serving=True):
if isinstance(ssl, bool):
raise TypeError('ssl argument must be an SSLContext or None')

Expand Down Expand Up @@ -302,11 +303,12 @@ async def create_unix_server(
raise ValueError(
f'A UNIX Domain Stream Socket was expected, got {sock!r}')

server = base_events.Server(self, [sock])
sock.listen(backlog)
sock.setblocking(False)
self._start_serving(protocol_factory, sock, ssl, server,
ssl_handshake_timeout=ssl_handshake_timeout)
server = base_events.Server(self, [sock], protocol_factory,
ssl, backlog, ssl_handshake_timeout)
if start_serving:
server._start_serving()

return server

async def _sock_sendfile_native(self, sock, file, offset, count):
Expand Down
Loading