Skip to content
15 changes: 15 additions & 0 deletions Doc/library/asyncio-stream.rst
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,21 @@ StreamWriter

Close the transport: see :meth:`BaseTransport.close`.

.. method:: is_closing()

Return ``True`` if the writer is closing or is closed.

.. versionadded:: 3.7

.. coroutinemethod:: wait_closed()

Wait until the writer is closed.

Should be called after :meth:`close` to wait until the underlying
connection (and the associated transport/protocol pair) is closed.

.. versionadded:: 3.7

.. coroutinemethod:: drain()

Let the write buffer of the underlying transport a chance to be flushed.
Expand Down
36 changes: 27 additions & 9 deletions Lib/asyncio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def __init__(self, stream_reader, client_connected_cb=None, loop=None):
self._stream_writer = None
self._client_connected_cb = client_connected_cb
self._over_ssl = False
self._closed = self._loop.create_future()

def connection_made(self, transport):
self._stream_reader.set_transport(transport)
Expand All @@ -243,6 +244,11 @@ def connection_lost(self, exc):
self._stream_reader.feed_eof()
else:
self._stream_reader.set_exception(exc)
if not self._closed.done():
if exc is None:
self._closed.set_result(None)
else:
self._closed.set_exception(exc)
super().connection_lost(exc)
self._stream_reader = None
self._stream_writer = None
Expand All @@ -259,6 +265,13 @@ def eof_received(self):
return False
return True

def __del__(self):
# Prevent reports about unhandled exceptions.
# Better than self._closed._log_traceback = False hack
closed = self._closed
if closed.done() and not closed.cancelled():
closed.exception()


class StreamWriter:
"""Wraps a Transport.
Expand Down Expand Up @@ -303,6 +316,12 @@ def can_write_eof(self):
def close(self):
return self._transport.close()

def is_closing(self):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new API method? Please document it if it is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot about the change - but exposing the method makes sense

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit too low-level. Can you imagine a good use case for async/await code that uses writer and reader?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamWriter exposes all Transport methods except flow control related.
is_closing() is not about flow control, I see no reason to not implement it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, go ahead.

return self._transport.is_closing()

async def wait_closed(self):
await self._protocol._closed

def get_extra_info(self, name, default=None):
return self._transport.get_extra_info(name, default)

Expand All @@ -318,15 +337,14 @@ async def drain(self):
exc = self._reader.exception()
if exc is not None:
raise exc
if self._transport is not None:
if self._transport.is_closing():
# Yield to the event loop so connection_lost() may be
# called. Without this, _drain_helper() would return
# immediately, and code that calls
# write(...); await drain()
# in a loop would never call connection_lost(), so it
# would not see an error when the socket is closed.
await sleep(0, loop=self._loop)
if self._transport.is_closing():
# Yield to the event loop so connection_lost() may be
# called. Without this, _drain_helper() would return
# immediately, and code that calls
# write(...); await drain()
# in a loop would never call connection_lost(), so it
# would not see an error when the socket is closed.
await sleep(0, loop=self._loop)
await self._protocol._drain_helper()


Expand Down
31 changes: 30 additions & 1 deletion Lib/test/test_asyncio/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from test.test_asyncio import utils as test_utils


class StreamReaderTests(test_utils.TestCase):
class StreamTests(test_utils.TestCase):

DATA = b'line1\nline2\nline3\n'

Expand Down Expand Up @@ -860,6 +860,35 @@ def test_LimitOverrunError_pickleable(self):
self.assertEqual(str(e), str(e2))
self.assertEqual(e.consumed, e2.consumed)

def test_wait_closed_on_close(self):
with test_utils.run_test_server() as httpd:
rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address, loop=self.loop))

wr.write(b'GET / HTTP/1.0\r\n\r\n')
f = rd.readline()
data = self.loop.run_until_complete(f)
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
f = rd.read()
data = self.loop.run_until_complete(f)
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
self.assertFalse(wr.is_closing())
wr.close()
self.assertTrue(wr.is_closing())
self.loop.run_until_complete(wr.wait_closed())

def test_wait_closed_on_close_with_unread_data(self):
with test_utils.run_test_server() as httpd:
rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address, loop=self.loop))

wr.write(b'GET / HTTP/1.0\r\n\r\n')
f = rd.readline()
data = self.loop.run_until_complete(f)
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
wr.close()
self.loop.run_until_complete(wr.wait_closed())


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement :meth:`asyncio.StreamWriter.wait_closed` and :meth:`asyncio.StreamWriter.is_closing` methods
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last note: I don't think you want to use ReST formatting in NEWS entries. This would read better:

Implement wait_closed() and is_closing() methods for StreamWriter.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no standard for blurb text, the format of messages is mixed.
Some people use ReST, others prefer plain text.
If you briefly look on these files you'll see both cases.

I have no personal preference.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really care.