From fa07b3547a05f0872d483d490159596a395ee39c Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 22 Jan 2018 17:25:24 +0200 Subject: [PATCH 01/31] Implement loop.sendfile() --- Lib/asyncio/base_events.py | 111 +++++++++++++++++ Lib/asyncio/constants.py | 9 ++ Lib/asyncio/events.py | 8 ++ Lib/asyncio/selector_events.py | 38 ++++++ Lib/asyncio/sslproto.py | 7 ++ Lib/test/test_asyncio/test_base_events.py | 2 +- Lib/test/test_asyncio/test_events.py | 138 +++++++++++++++++++++- 7 files changed, 309 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index ca9eee765e3531..415159f772a8ea 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -38,6 +38,7 @@ from . import coroutines from . import events from . import futures +from . import protocols from . import sslproto from . import tasks from .log import logger @@ -155,6 +156,60 @@ def _run_until_complete_cb(fut): futures._get_loop(fut).stop() + +class _SendfileProtocol(protocols.Protocol): + def __init__(self, transp): + # transport should be _FlowControlMixin instance + self._transport = transp + self._proto = transp.get_protocol() + self._resume_reading = transp.is_reading() + self._write_paused = transp._protocol_paused + transp.pause_reading() + transp.set_protocol(self) + if self._write_paused: + self._paused = self._transport._loop.create_future() + else: + self._paused = None + + def connection_made(self, transport): + raise RuntimeError("Broken state, " + "connection should be established already.") + + def connection_lost(self, exc): + if self._paused is not None: + if exc is None: + self._paused.set_result(True) + else: + self._paused.set_exception(exc) + self._proto.connection_lost(exc) + + def pause_writing(self): + if self._paused is not None: + return + self._paused = self._transport._loop.create_future() + + def resume_writing(self): + if self._paused is None: + return + self._paused.set_result(False) + self._paused = None + + def data_received(self, data): + raise RuntimeError("Broken state, reading should be paused") + + def eof_received(self): + raise RuntimeError("Broken state, reading should be paused") + + async def restore(self): + self._transport.set_protocol(self._proto) + if self._resume_reading: + self._transport.resume_reading() + if self._paused is not None: + await self._paused + if self._write_paused: + self._proto.resume_writing() + + class Server(events.AbstractServer): def __init__(self, loop, sockets): @@ -866,6 +921,62 @@ async def _create_connection_transport( return transport, protocol + async def sendfile(self, transport, file, offset=0, count=None, + *, fallback=True): + """Send a file through a transport. + + Return amount of sent bytes. + """ + mode = getattr(transport, '_sendfile_compatible', + constants._SendfileMode.UNSUPPORTED) + if mode is constants._SendfileMode.UNSUPPORTED: + raise RuntimeError( + f"sendfile is not supported for transport {transport!r}") + if mode is constants._SendfileMode.NATIVE: + try: + await self._sendfile_native(transport, file, + offset, count) + return + except events.SendfileNotAvailableError as exc: + if not fallback: + raise + # the mode is FALLBACK or fallback is True + await self._sendfile_fallback(transport, file, + offset, count) + + async def _sendfile_native(self, transp, file, offset, count): + raise events.SendfileNotAvailableError( + "sendfile syscall is not supported") + + async def _sendfile_fallback(self, transp, file, offset, count): + if offset: + file.seek(offset) + blocksize = min(count, 16384) if count else 16384 + buf = bytearray(blocksize) + total_sent = 0 + proto = _SendfileProtocol(transp) + try: + while True: + if count: + blocksize = min(count - total_sent, blocksize) + if blocksize <= 0: + break + fut = proto._paused + if fut is not None: + if await fut: + # eof received + return + view = memoryview(buf)[:blocksize] + read = file.readinto(view) + if not read: + break # EOF + transp.write(view) + total_sent += read + finally: + if total_sent > 0 and hasattr(file, 'seek'): + file.seek(offset + total_sent) + await proto.restore() + async def start_tls(self, transport, protocol, sslcontext, *, server_side=False, server_hostname=None, diff --git a/Lib/asyncio/constants.py b/Lib/asyncio/constants.py index 0ad974ff2fb9d5..d5fed658a58ab4 100644 --- a/Lib/asyncio/constants.py +++ b/Lib/asyncio/constants.py @@ -1,3 +1,5 @@ +import enum + # After the connection is lost, log warnings after this many write()s. LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5 @@ -11,3 +13,10 @@ # Number of seconds to wait for SSL handshake to complete SSL_HANDSHAKE_TIMEOUT = 10.0 + +# The enum should be here to break circular dependencies between +# base_events and sslproto +class _SendfileMode(enum.Enum): + UNSUPPORTED = enum.auto() + NATIVE = enum.auto() + FALLBACK = enum.auto() diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index d5365dc480d3d3..9579a19ed537fa 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -317,6 +317,14 @@ async def create_server( """ raise NotImplementedError + async def sendfile(self, transport, file, offset=0, count=None, + *, fallback=True): + """Send a file through a transport. + + Return an amount of sent bytes. + """ + raise NotImplementedError + async def start_tls(self, transport, protocol, sslcontext, *, server_side=False, server_hostname=None, diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 5692e38486a066..19fddad28ec8da 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -540,6 +540,22 @@ def _sock_accept(self, fut, registered, sock): else: fut.set_result((conn, address)) + async def _sendfile_native(self, transp, file, offset, count): + del self._transports[transp._sock_fd] + resume_reading = transp.is_reading() + transp.pause_reading() + fut = transp._make_empty_waiter() + await fut + sock = transp._sock + try: + return await self.sock_sendfile(sock, file, offset, count, + fallback=False) + finally: + transp._reset_empty_waiter() + if resume_reading: + transp.resume_reading() + self._transports[transp._sock_fd] = transp + def _process_events(self, event_list): for key, mask in event_list: fileobj, (reader, writer) = key.fileobj, key.data @@ -695,12 +711,14 @@ def get_write_buffer_size(self): class _SelectorSocketTransport(_SelectorTransport): _start_tls_compatible = True + _sendfile_compatible = constants._SendfileMode.NATIVE def __init__(self, loop, sock, protocol, waiter=None, extra=None, server=None): super().__init__(loop, sock, protocol, extra, server) self._eof = False self._paused = False + self._empty_waiter = None # Disable the Nagle algorithm -- small writes will be # sent without waiting for the TCP ACK. This generally @@ -765,6 +783,9 @@ def write(self, data): f'not {type(data).__name__!r}') if self._eof: raise RuntimeError('Cannot call write() after write_eof()') + if self._empty_waiter is not None: + raise RuntimeError('Cannot call write() when loop.sendfile() ' + 'is not finished') if not data: return @@ -798,6 +819,8 @@ def _write_ready(self): assert self._buffer, 'Data should not be empty' if self._conn_lost: + if self._empty_waiter is not None: + self._empty_waiter.set_result(True) return try: n = self._sock.send(self._buffer) @@ -807,12 +830,16 @@ def _write_ready(self): self._loop._remove_writer(self._sock_fd) self._buffer.clear() self._fatal_error(exc, 'Fatal write error on socket transport') + if self._empty_waiter is not None: + self._empty_waiter.set_exception(exc) else: if n: del self._buffer[:n] self._maybe_resume_protocol() # May append to buffer. if not self._buffer: self._loop._remove_writer(self._sock_fd) + if self._empty_waiter is not None: + self._empty_waiter.set_result(False) if self._closing: self._call_connection_lost(None) elif self._eof: @@ -828,6 +855,17 @@ def write_eof(self): def can_write_eof(self): return True + def _make_empty_waiter(self): + if self._empty_waiter is not None: + raise RuntimeError("Empty waiter is already set") + self._empty_waiter = self._loop.create_future() + if not self._buffer: + self._empty_waiter.set_result(None) + return self._empty_waiter + + def _reset_empty_waiter(self): + self._empty_waiter = None + class _SelectorDatagramTransport(_SelectorTransport): diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py index 2d377c4ae39b5d..224b8190076f15 100644 --- a/Lib/asyncio/sslproto.py +++ b/Lib/asyncio/sslproto.py @@ -282,6 +282,8 @@ def feed_appdata(self, data, offset=0): class _SSLProtocolTransport(transports._FlowControlMixin, transports.Transport): + _sendfile_compatible = constants._SendfileMode.FALLBACK + def __init__(self, loop, ssl_protocol): self._loop = loop # SSLProtocol instance @@ -365,6 +367,11 @@ def get_write_buffer_size(self): """Return the current size of the write buffer.""" return self._ssl_protocol._transport.get_write_buffer_size() + @property + def _protocol_paused(self): + # Required for sendfile fallback pause_writing/resume_writing logic + return self._ssl_protocol._transport._protocol_paused + def write(self, data): """Write some data bytes to the transport. diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index fc3b81096dabe4..debab0b15ebf85 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1788,7 +1788,7 @@ def runner(loop): outer_loop.close() -class BaseLoopSendfileTests(test_utils.TestCase): +class BaseLoopSockSendfileTests(test_utils.TestCase): DATA = b"12345abcde" * 16 * 1024 # 160 KiB diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 4140f03967ee6c..0390b8e0d7cb3c 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2090,14 +2090,144 @@ async def connect(cmd=None, **kwds): self.loop.run_until_complete(connect(shell=False)) +class SendfileMixin: + # Note: sendfile via SSL transport is equal to sendfile fallback + + LARGE_DATA = b"12345abcde" * 16 * 1024 * 1024 # 16 MiB + DATA = memoryview(LARGE_DATA)[:160 * 1024] # 160 Kib + + @classmethod + def setUpClass(cls): + with open(support.TESTFN + '.long', 'wb') as fp: + fp.write(cls.LARGE_DATA) + with open(support.TESTFN + '.short', 'wb') as fp: + fp.write(cls.DATA) + super().setUpClass() + + @classmethod + def tearDownClass(cls): + support.unlink(support.TESTFN + '.long') + support.unlink(support.TESTFN + '.short') + super().tearDownClass() + + def setUp(self): + self.sfile = open(support.TESTFN + '.short', 'rb') + self.addCleanup(self.sfile.close) + self.lfile = open(support.TESTFN + '.long', 'rb') + self.addCleanup(self.lfile.close) + super().setUp() + + def run_loop(self, coro): + return self.loop.run_until_complete(coro) + + def prepare(self, is_ssl=False): + port = support.find_unused_port() + srv_proto = MyBaseProto(loop=self.loop) + if is_ssl: + srv_ctx = test_utils.simple_server_sslcontext() + cli_ctx = test_utils.simple_client_sslcontext() + else: + srv_ctx = None + cli_ctx = None + server = self.run_loop(self.loop.create_server( + lambda: srv_proto, support.HOST, port, ssl=srv_ctx)) + + cli_proto = MyBaseProto() + tr, pr = self.run_loop(self.loop.create_connection( + lambda: cli_proto, support.HOST, port, ssl=cli_ctx)) + + def cleanup(): + srv_proto.transport.close() + cli_proto.transport.close() + + server.close() + self.run_loop(server.wait_closed()) + + self.addCleanup(cleanup) + return srv_proto, cli_proto + + def test_sendfile_not_supported(self): + tr, pr = self.run_loop( + self.loop.create_datagram_endpoint( + lambda: MyDatagramProto(loop=self.loop), + family=socket.AF_INET)) + try: + with self.assertRaisesRegex(RuntimeError, "not supported"): + self.run_loop( + self.loop.sendfile(tr, self.sfile)) + self.assertEqual(0, self.sfile.tell()) + finally: + # don't use self.addCleanup because it produces resource warning + tr.close() + + def test_sendfile(self): + srv_proto, cli_proto = self.prepare() + self.run_loop(self.loop.sendfile(cli_proto.transport, self.sfile)) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(srv_proto.nbytes, len(self.DATA)) + self.assertEqual(self.sfile.tell(), len(self.DATA)) + + def test_sendfile_ssl(self): + srv_proto, cli_proto = self.prepare(is_ssl=True) + self.run_loop(self.loop.sendfile(cli_proto.transport, self.sfile)) + self.assertFalse(cli_proto.transport._protocol_paused) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(srv_proto.nbytes, len(self.DATA)) + self.assertEqual(self.sfile.tell(), len(self.DATA)) + + def test_sendfile_large(self): + srv_proto, cli_proto = self.prepare() + self.run_loop(self.loop.sendfile(cli_proto.transport, self.lfile)) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(srv_proto.nbytes, len(self.LARGE_DATA)) + self.assertEqual(self.lfile.tell(), len(self.LARGE_DATA)) + + def test_sendfile_ssl_large(self): + srv_proto, cli_proto = self.prepare(is_ssl=True) + self.run_loop(self.loop.sendfile(cli_proto.transport, self.lfile)) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(srv_proto.nbytes, len(self.LARGE_DATA)) + self.assertEqual(self.lfile.tell(), len(self.LARGE_DATA)) + + def test_sendfile_ssl_already_paused(self): + srv_proto, cli_proto = self.prepare(is_ssl=True) + self.run_loop(self.loop.sendfile(cli_proto.transport, self.sfile)) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(srv_proto.nbytes, len(self.DATA)) + self.assertEqual(self.sfile.tell(), len(self.DATA)) + + def test_sendfile_ssl_already_paused(self): + # 10 MB is enough to not fit into single sock.send() call + BUF = b'1234567890' * 1024 * 1024 + srv_proto, cli_proto = self.prepare(is_ssl=True) + cli_proto.transport.set_write_buffer_limits(2, 1) + cli_proto.transport.write(BUF) + self.assertTrue(cli_proto.transport._protocol_paused) + self.run_loop(self.loop.sendfile(cli_proto.transport, self.sfile)) + # writing is always restored by fallback if was paused + self.assertFalse(cli_proto.transport._protocol_paused) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(srv_proto.nbytes, len(self.DATA)+len(BUF)) + self.assertEqual(self.sfile.tell(), len(self.DATA)) + + if sys.platform == 'win32': - class SelectEventLoopTests(EventLoopTestsMixin, test_utils.TestCase): + class SelectEventLoopTests(EventLoopTestsMixin, + SendfileMixin, + test_utils.TestCase): def create_event_loop(self): return asyncio.SelectorEventLoop() class ProactorEventLoopTests(EventLoopTestsMixin, + SendfileMixin, SubprocessTestsMixin, test_utils.TestCase): @@ -2125,7 +2255,7 @@ def test_remove_fds_after_closing(self): else: import selectors - class UnixEventLoopTestsMixin(EventLoopTestsMixin): + class UnixEventLoopTestsMixin(EventLoopTestsMixin, SendfileMixin): def setUp(self): super().setUp() watcher = asyncio.SafeChildWatcher() @@ -2556,7 +2686,9 @@ async def inner(): with self.assertRaises(NotImplementedError): await loop.sock_accept(f) with self.assertRaises(NotImplementedError): - await loop.sock_sendfile(f, mock.Mock()) + await loop.sock_sendfile(f, f) + with self.assertRaises(NotImplementedError): + await loop.sendfile(f, f) with self.assertRaises(NotImplementedError): await loop.connect_read_pipe(f, mock.sentinel.pipe) with self.assertRaises(NotImplementedError): From 8ebd388a27fdde7c894e48e799499d84f952a0b8 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 22 Jan 2018 18:16:42 +0200 Subject: [PATCH 02/31] Fix notes --- Lib/asyncio/base_events.py | 20 +++++++++++--------- Lib/asyncio/constants.py | 2 +- Lib/asyncio/selector_events.py | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 415159f772a8ea..9d1b4ffcc94e63 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -162,17 +162,17 @@ def __init__(self, transp): # transport should be _FlowControlMixin instance self._transport = transp self._proto = transp.get_protocol() - self._resume_reading = transp.is_reading() - self._write_paused = transp._protocol_paused + self._should_resume_reading = transp.is_reading() + self._should_resume_writing = transp._protocol_paused transp.pause_reading() transp.set_protocol(self) - if self._write_paused: + if self._should_resume_writing: self._paused = self._transport._loop.create_future() else: self._paused = None def connection_made(self, transport): - raise RuntimeError("Broken state, " + raise RuntimeError("Invalid state, " "connection should be established already.") def connection_lost(self, exc): @@ -195,18 +195,18 @@ def resume_writing(self): self._paused = None def data_received(self, data): - raise RuntimeError("Broken state, reading should be paused") + raise RuntimeError("Invalid state, reading should be paused") def eof_received(self): - raise RuntimeError("Broken state, reading should be paused") + raise RuntimeError("Invalid state, reading should be paused") async def restore(self): self._transport.set_protocol(self._proto) - if self._resume_reading: + if self._should_resume_reading: self._transport.resume_reading() if self._paused is not None: await self._paused - if self._write_paused: + if self._should_resume_writing: self._proto.resume_writing() @@ -932,7 +932,7 @@ async def sendfile(self, transport, file, offset=0, count=None, if mode is constants._SendfileMode.UNSUPPORTED: raise RuntimeError( f"sendfile is not supported for transport {transport!r}") - if mode is constants._SendfileMode.NATIVE: + if mode is constants._SendfileMode.TRY_NATIVE: try: await self._sendfile_native(transport, file, offset, count) @@ -941,6 +941,8 @@ async def sendfile(self, transport, file, offset=0, count=None, if not fallback: raise # the mode is FALLBACK or fallback is True + assert (mode is constants._SendfileMode.FALLBACK or mode is + constants._SendfileMode.TRY_NATIVE and fallback), mode await self._sendfile_fallback(transport, file, offset, count) diff --git a/Lib/asyncio/constants.py b/Lib/asyncio/constants.py index d5fed658a58ab4..739b0a70c13e06 100644 --- a/Lib/asyncio/constants.py +++ b/Lib/asyncio/constants.py @@ -18,5 +18,5 @@ # base_events and sslproto class _SendfileMode(enum.Enum): UNSUPPORTED = enum.auto() - NATIVE = enum.auto() + TRY_NATIVE = enum.auto() FALLBACK = enum.auto() diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 19fddad28ec8da..df6937fa9d7b90 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -711,7 +711,7 @@ def get_write_buffer_size(self): class _SelectorSocketTransport(_SelectorTransport): _start_tls_compatible = True - _sendfile_compatible = constants._SendfileMode.NATIVE + _sendfile_compatible = constants._SendfileMode.TRY_NATIVE def __init__(self, loop, sock, protocol, waiter=None, extra=None, server=None): From c7194f70e73184d094d2b55ce908882c24bb54e9 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 22 Jan 2018 18:18:55 +0200 Subject: [PATCH 03/31] Add NEWS --- .../NEWS.d/next/Library/2018-01-22-18-18-44.bpo-32622.A1D6FP.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2018-01-22-18-18-44.bpo-32622.A1D6FP.rst diff --git a/Misc/NEWS.d/next/Library/2018-01-22-18-18-44.bpo-32622.A1D6FP.rst b/Misc/NEWS.d/next/Library/2018-01-22-18-18-44.bpo-32622.A1D6FP.rst new file mode 100644 index 00000000000000..d7433fa3cb1229 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-01-22-18-18-44.bpo-32622.A1D6FP.rst @@ -0,0 +1 @@ +Add :meth:`asyncio.AbstractEventLoop.sendfile` method. From 1632e0a0b8cf47cb84aedbaba9e99c2921fdce62 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 22 Jan 2018 19:48:29 +0200 Subject: [PATCH 04/31] Fix error text --- Lib/asyncio/base_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 9d1b4ffcc94e63..f74dfdf967b2ec 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -173,7 +173,7 @@ def __init__(self, transp): def connection_made(self, transport): raise RuntimeError("Invalid state, " - "connection should be established already.") + "connection should have been established already.") def connection_lost(self, exc): if self._paused is not None: From 535be775bb748b73f3e07b65e71039d4beb8763e Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 22 Jan 2018 20:07:39 +0200 Subject: [PATCH 05/31] Raise an exception on sendfile via closing transport --- Lib/asyncio/base_events.py | 2 ++ Lib/test/test_asyncio/test_events.py | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index f74dfdf967b2ec..79033340091ebb 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -927,6 +927,8 @@ async def sendfile(self, transport, file, offset=0, count=None, Return amount of sent bytes. """ + if transport.is_closing(): + raise RuntimeError("Transport is closing") mode = getattr(transport, '_sendfile_compatible', constants._SendfileMode.UNSUPPORTED) if mode is constants._SendfileMode.UNSUPPORTED: diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 0390b8e0d7cb3c..2320eb3119e933 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2216,6 +2216,15 @@ def test_sendfile_ssl_already_paused(self): self.assertEqual(srv_proto.nbytes, len(self.DATA)+len(BUF)) self.assertEqual(self.sfile.tell(), len(self.DATA)) + def test_sendfile_for_closing_transp(self): + srv_proto, cli_proto = self.prepare() + cli_proto.transport.close() + with self.assertRaisesRegex(RuntimeError, "is closing"): + self.run_loop(self.loop.sendfile(cli_proto.transport, self.sfile)) + self.run_loop(srv_proto.done) + self.assertEqual(srv_proto.nbytes, 0) + self.assertEqual(self.sfile.tell(), 0) + if sys.platform == 'win32': From bd8c515daa81932b029c3b9e713327b99eff177e Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 23 Jan 2018 13:42:03 +0200 Subject: [PATCH 06/31] Add more tests --- Lib/asyncio/base_events.py | 15 ++-- Lib/test/test_asyncio/test_events.py | 114 +++++++++++++++++++++++++-- 2 files changed, 113 insertions(+), 16 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 79033340091ebb..1908a8176ef99c 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -936,17 +936,16 @@ async def sendfile(self, transport, file, offset=0, count=None, f"sendfile is not supported for transport {transport!r}") if mode is constants._SendfileMode.TRY_NATIVE: try: - await self._sendfile_native(transport, file, - offset, count) - return + return await self._sendfile_native(transport, file, + offset, count) except events.SendfileNotAvailableError as exc: if not fallback: raise # the mode is FALLBACK or fallback is True assert (mode is constants._SendfileMode.FALLBACK or mode is constants._SendfileMode.TRY_NATIVE and fallback), mode - await self._sendfile_fallback(transport, file, - offset, count) + return await self._sendfile_fallback(transport, file, + offset, count) async def _sendfile_native(self, transp, file, offset, count): raise events.SendfileNotAvailableError( @@ -964,16 +963,16 @@ async def _sendfile_fallback(self, transp, file, offset, count): if count: blocksize = min(count - total_sent, blocksize) if blocksize <= 0: - break + return total_sent fut = proto._paused if fut is not None: if await fut: # eof received - return + return total_sent view = memoryview(buf)[:blocksize] read = file.readinto(view) if not read: - break # EOF + return total_sent # EOF transp.write(view) total_sent += read finally: diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 2320eb3119e933..49153af089824b 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -26,6 +26,7 @@ import tty import asyncio +from asyncio import base_events from asyncio import coroutines from asyncio import events from asyncio import proactor_events @@ -2090,6 +2091,17 @@ async def connect(cmd=None, **kwds): self.loop.run_until_complete(connect(shell=False)) +class MySendfileProto(MyBaseProto): + + def __init__(self, loop=None): + super().__init__(loop) + self.data = bytearray() + + def data_received(self, data): + self.data.extend(data) + super().data_received(data) + + class SendfileMixin: # Note: sendfile via SSL transport is equal to sendfile fallback @@ -2122,7 +2134,7 @@ def run_loop(self, coro): def prepare(self, is_ssl=False): port = support.find_unused_port() - srv_proto = MyBaseProto(loop=self.loop) + srv_proto = MySendfileProto(loop=self.loop) if is_ssl: srv_ctx = test_utils.simple_server_sslcontext() cli_ctx = test_utils.simple_client_sslcontext() @@ -2132,7 +2144,7 @@ def prepare(self, is_ssl=False): server = self.run_loop(self.loop.create_server( lambda: srv_proto, support.HOST, port, ssl=srv_ctx)) - cli_proto = MyBaseProto() + cli_proto = MySendfileProto() tr, pr = self.run_loop(self.loop.create_connection( lambda: cli_proto, support.HOST, port, ssl=cli_ctx)) @@ -2162,42 +2174,76 @@ def test_sendfile_not_supported(self): def test_sendfile(self): srv_proto, cli_proto = self.prepare() - self.run_loop(self.loop.sendfile(cli_proto.transport, self.sfile)) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.sfile)) cli_proto.transport.close() self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.DATA)) + self.assertEqual(srv_proto.data, self.DATA) + self.assertEqual(srv_proto.nbytes, len(self.DATA)) + self.assertEqual(self.sfile.tell(), len(self.DATA)) + + def test_sendfile_force_fallback(self): + srv_proto, cli_proto = self.prepare() + + def sendfile_native(transp, file, offset, count): + # to raise SendfileNotAvailableError + return base_events.BaseEventLoop._sendfile_native( + self.loop, transp, file, offset, count) + + self.loop._sendfile_native = sendfile_native + + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.sfile)) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.DATA)) + self.assertEqual(srv_proto.data, self.DATA) self.assertEqual(srv_proto.nbytes, len(self.DATA)) self.assertEqual(self.sfile.tell(), len(self.DATA)) def test_sendfile_ssl(self): srv_proto, cli_proto = self.prepare(is_ssl=True) - self.run_loop(self.loop.sendfile(cli_proto.transport, self.sfile)) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.sfile)) self.assertFalse(cli_proto.transport._protocol_paused) cli_proto.transport.close() self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.DATA)) + self.assertEqual(srv_proto.data, self.DATA) self.assertEqual(srv_proto.nbytes, len(self.DATA)) self.assertEqual(self.sfile.tell(), len(self.DATA)) def test_sendfile_large(self): srv_proto, cli_proto = self.prepare() - self.run_loop(self.loop.sendfile(cli_proto.transport, self.lfile)) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.lfile)) cli_proto.transport.close() self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.LARGE_DATA)) + self.assertEqual(srv_proto.data, self.LARGE_DATA) self.assertEqual(srv_proto.nbytes, len(self.LARGE_DATA)) self.assertEqual(self.lfile.tell(), len(self.LARGE_DATA)) def test_sendfile_ssl_large(self): srv_proto, cli_proto = self.prepare(is_ssl=True) - self.run_loop(self.loop.sendfile(cli_proto.transport, self.lfile)) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.lfile)) cli_proto.transport.close() self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.LARGE_DATA)) + self.assertEqual(srv_proto.data, self.LARGE_DATA) self.assertEqual(srv_proto.nbytes, len(self.LARGE_DATA)) self.assertEqual(self.lfile.tell(), len(self.LARGE_DATA)) def test_sendfile_ssl_already_paused(self): srv_proto, cli_proto = self.prepare(is_ssl=True) - self.run_loop(self.loop.sendfile(cli_proto.transport, self.sfile)) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.sfile)) cli_proto.transport.close() self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.DATA)) + self.assertEqual(srv_proto.data, self.DATA) self.assertEqual(srv_proto.nbytes, len(self.DATA)) self.assertEqual(self.sfile.tell(), len(self.DATA)) @@ -2208,11 +2254,14 @@ def test_sendfile_ssl_already_paused(self): cli_proto.transport.set_write_buffer_limits(2, 1) cli_proto.transport.write(BUF) self.assertTrue(cli_proto.transport._protocol_paused) - self.run_loop(self.loop.sendfile(cli_proto.transport, self.sfile)) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.sfile)) # writing is always restored by fallback if was paused self.assertFalse(cli_proto.transport._protocol_paused) cli_proto.transport.close() self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.DATA)) + self.assertEqual(srv_proto.data, BUF + self.DATA) self.assertEqual(srv_proto.nbytes, len(self.DATA)+len(BUF)) self.assertEqual(self.sfile.tell(), len(self.DATA)) @@ -2225,6 +2274,55 @@ def test_sendfile_for_closing_transp(self): self.assertEqual(srv_proto.nbytes, 0) self.assertEqual(self.sfile.tell(), 0) + def test_sendfile_pre_and_post_data(self): + srv_proto, cli_proto = self.prepare() + PREFIX = b'zxcvbnm' * 1024 * 1024 + SUFFIX = b'0987654321' * 1024 * 1024 + cli_proto.transport.write(PREFIX) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.sfile)) + cli_proto.transport.write(SUFFIX) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.DATA)) + self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX) + self.assertEqual(self.sfile.tell(), len(self.DATA)) + + def test_sendfile_ssl_pre_and_post_data(self): + srv_proto, cli_proto = self.prepare(is_ssl=True) + PREFIX = b'zxcvbnm' * 1024 * 1024 + SUFFIX = b'0987654321' * 1024 * 1024 + cli_proto.transport.write(PREFIX) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.sfile)) + cli_proto.transport.write(SUFFIX) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.DATA)) + self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX) + self.assertEqual(self.sfile.tell(), len(self.DATA)) + + def test_sendfile_partial(self): + srv_proto, cli_proto = self.prepare() + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.sfile, 1000, 100)) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(ret, 100) + self.assertEqual(srv_proto.data, self.DATA[1000:1100]) + self.assertEqual(self.sfile.tell(), 1100) + + def test_sendfile_ssl_partial(self): + srv_proto, cli_proto = self.prepare(is_ssl=True) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.sfile, 1000, 100)) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(ret, 100) + self.assertEqual(srv_proto.data, self.DATA[1000:1100]) + self.assertEqual(self.sfile.tell(), 1100) + + if sys.platform == 'win32': From fab4669b11ee25ccf2380ed44c8a7b6f69c485dc Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 23 Jan 2018 18:10:38 +0200 Subject: [PATCH 07/31] More tests --- Lib/asyncio/base_events.py | 8 +- Lib/test/test_asyncio/test_events.py | 203 +++++++++++++++++---------- 2 files changed, 131 insertions(+), 80 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 1908a8176ef99c..0cc674c3e517b9 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -177,8 +177,10 @@ def connection_made(self, transport): def connection_lost(self, exc): if self._paused is not None: + # never happens if peer disconnects after sending the whole content if exc is None: - self._paused.set_result(True) + self._paused.set_exception( + ConnectionResetError("Connection reset by peer")) else: self._paused.set_exception(exc) self._proto.connection_lost(exc) @@ -966,9 +968,7 @@ async def _sendfile_fallback(self, transp, file, offset, count): return total_sent fut = proto._paused if fut is not None: - if await fut: - # eof received - return total_sent + await fut view = memoryview(buf)[:blocksize] read = file.readinto(view) if not read: diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 49153af089824b..77da319a335e3b 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2093,60 +2093,70 @@ async def connect(cmd=None, **kwds): class MySendfileProto(MyBaseProto): - def __init__(self, loop=None): + def __init__(self, loop=None, close_after=0): super().__init__(loop) self.data = bytearray() + self.close_after = close_after def data_received(self, data): self.data.extend(data) super().data_received(data) + if self.close_after and self.nbytes >= self.close_after: + self.transport.close() class SendfileMixin: # Note: sendfile via SSL transport is equal to sendfile fallback - LARGE_DATA = b"12345abcde" * 16 * 1024 * 1024 # 16 MiB - DATA = memoryview(LARGE_DATA)[:160 * 1024] # 160 Kib + DATA = b"12345abcde" * 160 * 1024 # 160 KiB @classmethod def setUpClass(cls): - with open(support.TESTFN + '.long', 'wb') as fp: - fp.write(cls.LARGE_DATA) - with open(support.TESTFN + '.short', 'wb') as fp: + with open(support.TESTFN, 'wb') as fp: fp.write(cls.DATA) super().setUpClass() @classmethod def tearDownClass(cls): - support.unlink(support.TESTFN + '.long') - support.unlink(support.TESTFN + '.short') + support.unlink(support.TESTFN) super().tearDownClass() def setUp(self): - self.sfile = open(support.TESTFN + '.short', 'rb') - self.addCleanup(self.sfile.close) - self.lfile = open(support.TESTFN + '.long', 'rb') - self.addCleanup(self.lfile.close) + self.file = open(support.TESTFN, 'rb') + self.addCleanup(self.file.close) super().setUp() def run_loop(self, coro): return self.loop.run_until_complete(coro) - def prepare(self, is_ssl=False): + def prepare(self, *, is_ssl=False, close_after=0): port = support.find_unused_port() - srv_proto = MySendfileProto(loop=self.loop) + srv_proto = MySendfileProto(loop=self.loop, close_after=close_after) if is_ssl: srv_ctx = test_utils.simple_server_sslcontext() cli_ctx = test_utils.simple_client_sslcontext() else: srv_ctx = None cli_ctx = None + srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # reduce recv socket buffer size to test on relative small data sets + srv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) + srv_sock.bind((support.HOST, port)) server = self.run_loop(self.loop.create_server( - lambda: srv_proto, support.HOST, port, ssl=srv_ctx)) + lambda: srv_proto, sock=srv_sock, ssl=srv_ctx)) + if is_ssl: + server_hostname = support.HOST + else: + server_hostname = None + cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # reduce send socket buffer size to test on relative small data sets + cli_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) + cli_sock.connect((support.HOST, port)) cli_proto = MySendfileProto() tr, pr = self.run_loop(self.loop.create_connection( - lambda: cli_proto, support.HOST, port, ssl=cli_ctx)) + lambda: cli_proto, sock=cli_sock, + ssl=cli_ctx, server_hostname=server_hostname)) def cleanup(): srv_proto.transport.close() @@ -2166,8 +2176,8 @@ def test_sendfile_not_supported(self): try: with self.assertRaisesRegex(RuntimeError, "not supported"): self.run_loop( - self.loop.sendfile(tr, self.sfile)) - self.assertEqual(0, self.sfile.tell()) + self.loop.sendfile(tr, self.file)) + self.assertEqual(0, self.file.tell()) finally: # don't use self.addCleanup because it produces resource warning tr.close() @@ -2175,13 +2185,13 @@ def test_sendfile_not_supported(self): def test_sendfile(self): srv_proto, cli_proto = self.prepare() ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.sfile)) + self.loop.sendfile(cli_proto.transport, self.file)) cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.data, self.DATA) self.assertEqual(srv_proto.nbytes, len(self.DATA)) - self.assertEqual(self.sfile.tell(), len(self.DATA)) + self.assertEqual(srv_proto.data, self.DATA) + self.assertEqual(self.file.tell(), len(self.DATA)) def test_sendfile_force_fallback(self): srv_proto, cli_proto = self.prepare() @@ -2194,134 +2204,175 @@ def sendfile_native(transp, file, offset, count): self.loop._sendfile_native = sendfile_native ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.sfile)) + self.loop.sendfile(cli_proto.transport, self.file)) cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.data, self.DATA) self.assertEqual(srv_proto.nbytes, len(self.DATA)) - self.assertEqual(self.sfile.tell(), len(self.DATA)) - - def test_sendfile_ssl(self): - srv_proto, cli_proto = self.prepare(is_ssl=True) - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.sfile)) - self.assertFalse(cli_proto.transport._protocol_paused) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.DATA)) self.assertEqual(srv_proto.data, self.DATA) - self.assertEqual(srv_proto.nbytes, len(self.DATA)) - self.assertEqual(self.sfile.tell(), len(self.DATA)) + self.assertEqual(self.file.tell(), len(self.DATA)) - def test_sendfile_large(self): + def test_sendfile_force_unsupported_native(self): srv_proto, cli_proto = self.prepare() - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.lfile)) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.LARGE_DATA)) - self.assertEqual(srv_proto.data, self.LARGE_DATA) - self.assertEqual(srv_proto.nbytes, len(self.LARGE_DATA)) - self.assertEqual(self.lfile.tell(), len(self.LARGE_DATA)) - def test_sendfile_ssl_large(self): - srv_proto, cli_proto = self.prepare(is_ssl=True) - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.lfile)) + def sendfile_native(transp, file, offset, count): + # to raise SendfileNotAvailableError + return base_events.BaseEventLoop._sendfile_native( + self.loop, transp, file, offset, count) + + self.loop._sendfile_native = sendfile_native + + with self.assertRaisesRegex(events.SendfileNotAvailableError, + "not supported"): + self.run_loop( + self.loop.sendfile(cli_proto.transport, self.file, + fallback=False)) cli_proto.transport.close() self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.LARGE_DATA)) - self.assertEqual(srv_proto.data, self.LARGE_DATA) - self.assertEqual(srv_proto.nbytes, len(self.LARGE_DATA)) - self.assertEqual(self.lfile.tell(), len(self.LARGE_DATA)) + self.assertEqual(srv_proto.nbytes, 0) + self.assertEqual(self.file.tell(), 0) - def test_sendfile_ssl_already_paused(self): + def test_sendfile_ssl(self): srv_proto, cli_proto = self.prepare(is_ssl=True) ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.sfile)) + self.loop.sendfile(cli_proto.transport, self.file)) + self.assertFalse(cli_proto.transport._protocol_paused) cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.data, self.DATA) self.assertEqual(srv_proto.nbytes, len(self.DATA)) - self.assertEqual(self.sfile.tell(), len(self.DATA)) + self.assertEqual(srv_proto.data, self.DATA) + self.assertEqual(self.file.tell(), len(self.DATA)) def test_sendfile_ssl_already_paused(self): - # 10 MB is enough to not fit into single sock.send() call - BUF = b'1234567890' * 1024 * 1024 + # 30 Kb is enough to not fit into single sock.send() call + BUF = b'1234567890' * 1024 * 3 srv_proto, cli_proto = self.prepare(is_ssl=True) - cli_proto.transport.set_write_buffer_limits(2, 1) + cli_proto.transport.set_write_buffer_limits(0) cli_proto.transport.write(BUF) self.assertTrue(cli_proto.transport._protocol_paused) ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.sfile)) + self.loop.sendfile(cli_proto.transport, self.file)) # writing is always restored by fallback if was paused self.assertFalse(cli_proto.transport._protocol_paused) cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.data, BUF + self.DATA) self.assertEqual(srv_proto.nbytes, len(self.DATA)+len(BUF)) - self.assertEqual(self.sfile.tell(), len(self.DATA)) + self.assertEqual(srv_proto.data, BUF + self.DATA) + self.assertEqual(self.file.tell(), len(self.DATA)) def test_sendfile_for_closing_transp(self): srv_proto, cli_proto = self.prepare() cli_proto.transport.close() with self.assertRaisesRegex(RuntimeError, "is closing"): - self.run_loop(self.loop.sendfile(cli_proto.transport, self.sfile)) + self.run_loop(self.loop.sendfile(cli_proto.transport, self.file)) self.run_loop(srv_proto.done) self.assertEqual(srv_proto.nbytes, 0) - self.assertEqual(self.sfile.tell(), 0) + self.assertEqual(self.file.tell(), 0) def test_sendfile_pre_and_post_data(self): srv_proto, cli_proto = self.prepare() - PREFIX = b'zxcvbnm' * 1024 * 1024 - SUFFIX = b'0987654321' * 1024 * 1024 + PREFIX = b'zxcvbnm' * 1024 + SUFFIX = b'0987654321' * 1024 cli_proto.transport.write(PREFIX) ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.sfile)) + self.loop.sendfile(cli_proto.transport, self.file)) cli_proto.transport.write(SUFFIX) cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, len(self.DATA)) self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX) - self.assertEqual(self.sfile.tell(), len(self.DATA)) + self.assertEqual(self.file.tell(), len(self.DATA)) def test_sendfile_ssl_pre_and_post_data(self): srv_proto, cli_proto = self.prepare(is_ssl=True) - PREFIX = b'zxcvbnm' * 1024 * 1024 - SUFFIX = b'0987654321' * 1024 * 1024 + PREFIX = b'zxcvbnm' * 1024 + SUFFIX = b'0987654321' * 1024 cli_proto.transport.write(PREFIX) ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.sfile)) + self.loop.sendfile(cli_proto.transport, self.file)) cli_proto.transport.write(SUFFIX) cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, len(self.DATA)) self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX) - self.assertEqual(self.sfile.tell(), len(self.DATA)) + self.assertEqual(self.file.tell(), len(self.DATA)) def test_sendfile_partial(self): srv_proto, cli_proto = self.prepare() ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.sfile, 1000, 100)) + self.loop.sendfile(cli_proto.transport, self.file, 1000, 100)) cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, 100) + self.assertEqual(srv_proto.nbytes, 100) self.assertEqual(srv_proto.data, self.DATA[1000:1100]) - self.assertEqual(self.sfile.tell(), 1100) + self.assertEqual(self.file.tell(), 1100) def test_sendfile_ssl_partial(self): srv_proto, cli_proto = self.prepare(is_ssl=True) ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.sfile, 1000, 100)) + self.loop.sendfile(cli_proto.transport, self.file, 1000, 100)) cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, 100) + self.assertEqual(srv_proto.nbytes, 100) self.assertEqual(srv_proto.data, self.DATA[1000:1100]) - self.assertEqual(self.sfile.tell(), 1100) + self.assertEqual(self.file.tell(), 1100) + + def test_sendfile_close_peer_after_receiving(self): + srv_proto, cli_proto = self.prepare(close_after=len(self.DATA)) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.file)) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.DATA)) + self.assertEqual(srv_proto.nbytes, len(self.DATA)) + self.assertEqual(srv_proto.data, self.DATA) + self.assertEqual(self.file.tell(), len(self.DATA)) + + def test_sendfile_ssl_close_peer_after_receiving(self): + srv_proto, cli_proto = self.prepare(is_ssl=True, + close_after=len(self.DATA)) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.file)) + self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.DATA)) + self.assertEqual(srv_proto.nbytes, len(self.DATA)) + self.assertEqual(srv_proto.data, self.DATA) + self.assertEqual(self.file.tell(), len(self.DATA)) + def xtest_sendfile_close_peer_in_middle_of_receiving(self): + srv_proto, cli_proto = self.prepare(close_after=1024) + ret = self.run_loop( + self.loop.sendfile(cli_proto.transport, self.file)) + cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.assertEqual(ret, len(self.DATA)) + self.assertEqual(srv_proto.nbytes, len(self.DATA)) + self.assertEqual(srv_proto.data, self.DATA) + self.assertEqual(self.file.tell(), len(self.DATA)) + + def test_sendfile_fallback_close_peer_in_middle_of_receiving(self): + + def sendfile_native(transp, file, offset, count): + # to raise SendfileNotAvailableError + return base_events.BaseEventLoop._sendfile_native( + self.loop, transp, file, offset, count) + + self.loop._sendfile_native = sendfile_native + + srv_proto, cli_proto = self.prepare(close_after=1024) + cli_proto.transport.set_write_buffer_limits(0) + with self.assertRaises(ConnectionResetError): + self.run_loop( + self.loop.sendfile(cli_proto.transport, self.file)) + self.run_loop(srv_proto.done) + self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA), + srv_proto.nbytes) + self.assertTrue(1024 <= self.file.tell() < len(self.DATA), + self.file.tell()) if sys.platform == 'win32': From 2f51d6a6ceb96dc5bdfacb8817c6e83fc9bb27ce Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 23 Jan 2018 18:12:28 +0200 Subject: [PATCH 08/31] Update comment --- Lib/asyncio/base_events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 0cc674c3e517b9..13fcb2d4671f41 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -177,7 +177,8 @@ def connection_made(self, transport): def connection_lost(self, exc): if self._paused is not None: - # never happens if peer disconnects after sending the whole content + # Never happens if peer disconnects after sending the whole content + # Thus disconnection is always an exception from user perspective if exc is None: self._paused.set_exception( ConnectionResetError("Connection reset by peer")) From d36fcdf3ed070630e6cce94bb3d1f049b1b9b6e1 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 23 Jan 2018 18:23:00 +0200 Subject: [PATCH 09/31] More tests --- Lib/test/test_asyncio/test_events.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 77da319a335e3b..897a44ca008d57 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2343,16 +2343,17 @@ def test_sendfile_ssl_close_peer_after_receiving(self): self.assertEqual(srv_proto.data, self.DATA) self.assertEqual(self.file.tell(), len(self.DATA)) - def xtest_sendfile_close_peer_in_middle_of_receiving(self): + def test_sendfile_close_peer_in_middle_of_receiving(self): srv_proto, cli_proto = self.prepare(close_after=1024) - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - cli_proto.transport.close() + with self.assertRaises(ConnectionResetError): + self.run_loop( + self.loop.sendfile(cli_proto.transport, self.file)) self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.nbytes, len(self.DATA)) - self.assertEqual(srv_proto.data, self.DATA) - self.assertEqual(self.file.tell(), len(self.DATA)) + + self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA), + srv_proto.nbytes) + self.assertTrue(1024 <= self.file.tell() < len(self.DATA), + self.file.tell()) def test_sendfile_fallback_close_peer_in_middle_of_receiving(self): @@ -2364,11 +2365,11 @@ def sendfile_native(transp, file, offset, count): self.loop._sendfile_native = sendfile_native srv_proto, cli_proto = self.prepare(close_after=1024) - cli_proto.transport.set_write_buffer_limits(0) with self.assertRaises(ConnectionResetError): self.run_loop( self.loop.sendfile(cli_proto.transport, self.file)) self.run_loop(srv_proto.done) + self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA), srv_proto.nbytes) self.assertTrue(1024 <= self.file.tell() < len(self.DATA), From 2ae8bf56120e3aa5ff6f8bb8389b16ff92b184c8 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 23 Jan 2018 21:59:24 +0200 Subject: [PATCH 10/31] Stop file reading early --- Lib/asyncio/base_events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index d6421fb7b85754..394d83a62ef4bf 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -968,13 +968,13 @@ async def _sendfile_fallback(self, transp, file, offset, count): blocksize = min(count - total_sent, blocksize) if blocksize <= 0: return total_sent - fut = proto._paused - if fut is not None: - await fut view = memoryview(buf)[:blocksize] read = file.readinto(view) if not read: return total_sent # EOF + fut = proto._paused + if fut is not None: + await fut transp.write(view) total_sent += read finally: From 6b30924f2c44eaac726150e8c7cbf5099f99a303 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 23 Jan 2018 22:07:32 +0200 Subject: [PATCH 11/31] Revert back, test was failed --- Lib/asyncio/base_events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 394d83a62ef4bf..d6421fb7b85754 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -968,13 +968,13 @@ async def _sendfile_fallback(self, transp, file, offset, count): blocksize = min(count - total_sent, blocksize) if blocksize <= 0: return total_sent + fut = proto._paused + if fut is not None: + await fut view = memoryview(buf)[:blocksize] read = file.readinto(view) if not read: return total_sent # EOF - fut = proto._paused - if fut is not None: - await fut transp.write(view) total_sent += read finally: From d2bb432eb487bb63f54e99ea35f9af6461761d71 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 24 Jan 2018 16:17:32 +0200 Subject: [PATCH 12/31] Clarify empty_waiter possible values --- Lib/asyncio/base_events.py | 2 +- Lib/asyncio/selector_events.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index d6421fb7b85754..fe2b5708564a1c 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -181,7 +181,7 @@ def connection_lost(self, exc): # Thus disconnection is always an exception from user perspective if exc is None: self._paused.set_exception( - ConnectionResetError("Connection reset by peer")) + ConnectionError("Connection is closed by peer")) else: self._paused.set_exception(exc) self._proto.connection_lost(exc) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 60d683313a9192..91983ccd569682 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -820,7 +820,8 @@ def _write_ready(self): if self._conn_lost: if self._empty_waiter is not None: - self._empty_waiter.set_result(True) + self._empty_waiter.set_exception( + ConnectionError("Connection is closed by peer")) return try: n = self._sock.send(self._buffer) @@ -839,7 +840,7 @@ def _write_ready(self): if not self._buffer: self._loop._remove_writer(self._sock_fd) if self._empty_waiter is not None: - self._empty_waiter.set_result(False) + self._empty_waiter.set_result(None) if self._closing: self._call_connection_lost(None) elif self._eof: From 2bf14d34eb82520c138adb8cf1e291c752a7d1da Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 24 Jan 2018 16:30:05 +0200 Subject: [PATCH 13/31] Add docs --- Doc/library/asyncio-eventloop.rst | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 6cee171b65aaea..2d39b795b2d2a5 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -537,6 +537,37 @@ Creating listening connections .. versionadded:: 3.5.3 +File Transferring +----------------- + +.. coroutinemethod:: AbstractEventLoop.sendfile(sock, transport, \ + offset=0, count=None, \ + *, fallback=True) + + Send a *file* to *transport*, return the total number of bytes + which were sent. + + The method uses high-performance :meth:`os.sendfile` if available. + + *file* must be a regular file object opened in binary mode. + + *offset* tells from where to start reading the file. If specified, + *count* is the total number of bytes to transmit as opposed to + sending the file until EOF is reached. File position is updated on + return or also in case of error in which case :meth:`file.tell() + ` can be used to figure out the number of bytes + which were sent. + + *fallback* set to ``True`` makes asyncio to manually read and send + the file when the platform does not support the sendfile syscall + (e.g. Windows or SSL socket on Unix). + + Raise :exc:`SendfileNotAvailableError` if the system does not support + *sendfile* syscall and *fallback* is ``False``. + + .. versionadded:: 3.7 + + TLS Upgrade ----------- From c1ad2f47fde34eda45cda0dce3c0153c9a53dd1b Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 24 Jan 2018 17:46:33 +0200 Subject: [PATCH 14/31] Add a test --- Lib/test/test_asyncio/test_events.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 958baf24e8bd63..b41f9cebc2c2f0 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2373,6 +2373,22 @@ def sendfile_native(transp, file, offset, count): self.assertTrue(1024 <= self.file.tell() < len(self.DATA), self.file.tell()) + def test_sendfile_prevents_bare_write(self): + srv_proto, cli_proto = self.prepare() + fut = self.loop.create_future() + + async def coro(): + fut.set_result(None) + return await self.loop.sendfile(cli_proto.transport, self.file) + + t = self.loop.create_task(coro()) + self.run_loop(fut) + with self.assertRaisesRegex(RuntimeError, + r"loop\.sendfile\(\) is not finished"): + cli_proto.transport.write(b'data') + ret = self.run_loop(t) + self.assertEqual(ret, len(self.DATA)) + if sys.platform == 'win32': From 968c8c592ea860de3fa592f4c26db290cdf2f250 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 24 Jan 2018 23:25:48 +0200 Subject: [PATCH 15/31] Inline variable --- Lib/asyncio/selector_events.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 91983ccd569682..4f659b68e1134a 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -546,9 +546,8 @@ async def _sendfile_native(self, transp, file, offset, count): transp.pause_reading() fut = transp._make_empty_waiter() await fut - sock = transp._sock try: - return await self.sock_sendfile(sock, file, offset, count, + return await self.sock_sendfile(transp._sock, file, offset, count, fallback=False) finally: transp._reset_empty_waiter() From b905d1d76b0e261324451bd6d78f009d5dd37fbd Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 24 Jan 2018 23:38:58 +0200 Subject: [PATCH 16/31] Fix error texts --- Lib/asyncio/base_events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index fe2b5708564a1c..0603f0c7bdeae7 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -172,7 +172,7 @@ def __init__(self, transp): self._paused = None def connection_made(self, transport): - raise RuntimeError("Invalid state, " + raise RuntimeError("Invalid state: " "connection should have been established already.") def connection_lost(self, exc): @@ -198,10 +198,10 @@ def resume_writing(self): self._paused = None def data_received(self, data): - raise RuntimeError("Invalid state, reading should be paused") + raise RuntimeError("Invalid state: reading should be paused") def eof_received(self): - raise RuntimeError("Invalid state, reading should be paused") + raise RuntimeError("Invalid state: reading should be paused") async def restore(self): self._transport.set_protocol(self._proto) From e5f24b6b969ddee94e2d7cbed61d36328c56fa85 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 26 Jan 2018 18:02:13 +0200 Subject: [PATCH 17/31] _SenfileProtocol -> _SendfileFallbackProtocol --- Lib/asyncio/base_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index ba158db6cb1980..91657ec88ca943 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -157,7 +157,7 @@ def _run_until_complete_cb(fut): -class _SendfileProtocol(protocols.Protocol): +class _SendfileFallbackProtocol(protocols.Protocol): def __init__(self, transp): # transport should be _FlowControlMixin instance self._transport = transp @@ -1020,7 +1020,7 @@ async def _sendfile_fallback(self, transp, file, offset, count): blocksize = min(count, 16384) if count else 16384 buf = bytearray(blocksize) total_sent = 0 - proto = _SendfileProtocol(transp) + proto = _SendfileFallbackProtocol(transp) try: while True: if count: From bc458b3738de3dbcaffb9c53a938aacb19a62cc5 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 26 Jan 2018 18:03:40 +0200 Subject: [PATCH 18/31] Raise an exception for bad transport type --- Lib/asyncio/base_events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 91657ec88ca943..91762109f8c3cd 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -159,7 +159,8 @@ def _run_until_complete_cb(fut): class _SendfileFallbackProtocol(protocols.Protocol): def __init__(self, transp): - # transport should be _FlowControlMixin instance + if not isinstance(transp, transports._FlowControlMixin): + raise TypeError("transport should be _FlowControlMixin instance") self._transport = transp self._proto = transp.get_protocol() self._should_resume_reading = transp.is_reading() From e1a173ea3804ffaa114e43871041c0b66b3c89bb Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 26 Jan 2018 18:04:59 +0200 Subject: [PATCH 19/31] Drop assert --- Lib/asyncio/base_events.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 91762109f8c3cd..fb06d0b0db5927 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -1006,8 +1006,6 @@ async def sendfile(self, transport, file, offset=0, count=None, if not fallback: raise # the mode is FALLBACK or fallback is True - assert (mode is constants._SendfileMode.FALLBACK or mode is - constants._SendfileMode.TRY_NATIVE and fallback), mode return await self._sendfile_fallback(transport, file, offset, count) From 032bd400377356c1ee92bd96b3cae65c366d4475 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 26 Jan 2018 18:08:14 +0200 Subject: [PATCH 20/31] Tune exception text --- Lib/asyncio/selector_events.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 4f659b68e1134a..7b3063aeb292f8 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -783,8 +783,7 @@ def write(self, data): if self._eof: raise RuntimeError('Cannot call write() after write_eof()') if self._empty_waiter is not None: - raise RuntimeError('Cannot call write() when loop.sendfile() ' - 'is not finished') + raise RuntimeError('unable to write; sendfile is in progress') if not data: return From b466ed30751416e19b5dc0f376261554ab23ae9a Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 26 Jan 2018 18:26:22 +0200 Subject: [PATCH 21/31] Cancel sendfile waiter on transport closing --- Lib/asyncio/base_events.py | 1 + Lib/asyncio/selector_events.py | 9 ++++++--- Lib/test/test_asyncio/test_events.py | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index fb06d0b0db5927..c4085211fece1e 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -41,6 +41,7 @@ from . import protocols from . import sslproto from . import tasks +from . import transports from .log import logger diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 7b3063aeb292f8..2ec4524b0e54f1 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -817,9 +817,6 @@ def _write_ready(self): assert self._buffer, 'Data should not be empty' if self._conn_lost: - if self._empty_waiter is not None: - self._empty_waiter.set_exception( - ConnectionError("Connection is closed by peer")) return try: n = self._sock.send(self._buffer) @@ -854,6 +851,12 @@ def write_eof(self): def can_write_eof(self): return True + def _call_connection_lost(self, exc): + super()._call_connection_lost(exc) + if self._empty_waiter is not None: + self._empty_waiter.set_exception( + ConnectionError("Connection is closed by peer")) + def _make_empty_waiter(self): if self._empty_waiter is not None: raise RuntimeError("Empty waiter is already set") diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index b41f9cebc2c2f0..b15625e6429cb5 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2384,7 +2384,7 @@ async def coro(): t = self.loop.create_task(coro()) self.run_loop(fut) with self.assertRaisesRegex(RuntimeError, - r"loop\.sendfile\(\) is not finished"): + "sendfile is in progress"): cli_proto.transport.write(b'data') ret = self.run_loop(t) self.assertEqual(ret, len(self.DATA)) From 359181d961fd61aa87c632feb482befecf99e2c3 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 26 Jan 2018 19:01:15 +0200 Subject: [PATCH 22/31] Extract drain() method --- Lib/asyncio/base_events.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index c4085211fece1e..095fa5a8fcffca 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -169,35 +169,41 @@ def __init__(self, transp): transp.pause_reading() transp.set_protocol(self) if self._should_resume_writing: - self._paused = self._transport._loop.create_future() + self._write_ready_fut = self._transport._loop.create_future() else: - self._paused = None + self._write_ready_fut = None + + async def drain(self): + fut = self._write_ready_fut + if fut is None: + return + await fut def connection_made(self, transport): raise RuntimeError("Invalid state: " "connection should have been established already.") def connection_lost(self, exc): - if self._paused is not None: + if self._write_ready_fut is not None: # Never happens if peer disconnects after sending the whole content # Thus disconnection is always an exception from user perspective if exc is None: - self._paused.set_exception( + self._write_ready_fut.set_exception( ConnectionError("Connection is closed by peer")) else: - self._paused.set_exception(exc) + self._write_ready_fut.set_exception(exc) self._proto.connection_lost(exc) def pause_writing(self): - if self._paused is not None: + if self._write_ready_fut is not None: return - self._paused = self._transport._loop.create_future() + self._write_ready_fut = self._transport._loop.create_future() def resume_writing(self): - if self._paused is None: + if self._write_ready_fut is None: return - self._paused.set_result(False) - self._paused = None + self._write_ready_fut.set_result(False) + self._write_ready_fut = None def data_received(self, data): raise RuntimeError("Invalid state: reading should be paused") @@ -209,8 +215,8 @@ async def restore(self): self._transport.set_protocol(self._proto) if self._should_resume_reading: self._transport.resume_reading() - if self._paused is not None: - await self._paused + if self._write_ready_fut is not None: + await self._write_ready_fut if self._should_resume_writing: self._proto.resume_writing() @@ -1027,9 +1033,7 @@ async def _sendfile_fallback(self, transp, file, offset, count): blocksize = min(count - total_sent, blocksize) if blocksize <= 0: return total_sent - fut = proto._paused - if fut is not None: - await fut + await proto.drain() view = memoryview(buf)[:blocksize] read = file.readinto(view) if not read: From 11694072f3cb41c69a4a3126c0f46ef502649e2d Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 27 Jan 2018 12:31:29 +0200 Subject: [PATCH 23/31] Fix sendfile tests --- Lib/asyncio/base_events.py | 7 +++++-- Lib/test/test_asyncio/test_events.py | 19 ------------------- 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 095fa5a8fcffca..7fd797caaa5d00 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -216,7 +216,10 @@ async def restore(self): if self._should_resume_reading: self._transport.resume_reading() if self._write_ready_fut is not None: - await self._write_ready_fut + # Cancel the future. + # Basically it has no effect because protocol is switched back, + # no code should wait for it anymore. + self._write_ready_fut.cancel() if self._should_resume_writing: self._proto.resume_writing() @@ -1033,11 +1036,11 @@ async def _sendfile_fallback(self, transp, file, offset, count): blocksize = min(count - total_sent, blocksize) if blocksize <= 0: return total_sent - await proto.drain() view = memoryview(buf)[:blocksize] read = file.readinto(view) if not read: return total_sent # EOF + await proto.drain() transp.write(view) total_sent += read finally: diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index b15625e6429cb5..c1241366692aa4 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2234,7 +2234,6 @@ def test_sendfile_ssl(self): srv_proto, cli_proto = self.prepare(is_ssl=True) ret = self.run_loop( self.loop.sendfile(cli_proto.transport, self.file)) - self.assertFalse(cli_proto.transport._protocol_paused) cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, len(self.DATA)) @@ -2242,24 +2241,6 @@ def test_sendfile_ssl(self): self.assertEqual(srv_proto.data, self.DATA) self.assertEqual(self.file.tell(), len(self.DATA)) - def test_sendfile_ssl_already_paused(self): - # 30 Kb is enough to not fit into single sock.send() call - BUF = b'1234567890' * 1024 * 3 - srv_proto, cli_proto = self.prepare(is_ssl=True) - cli_proto.transport.set_write_buffer_limits(0) - cli_proto.transport.write(BUF) - self.assertTrue(cli_proto.transport._protocol_paused) - ret = self.run_loop( - self.loop.sendfile(cli_proto.transport, self.file)) - # writing is always restored by fallback if was paused - self.assertFalse(cli_proto.transport._protocol_paused) - cli_proto.transport.close() - self.run_loop(srv_proto.done) - self.assertEqual(ret, len(self.DATA)) - self.assertEqual(srv_proto.nbytes, len(self.DATA)+len(BUF)) - self.assertEqual(srv_proto.data, BUF + self.DATA) - self.assertEqual(self.file.tell(), len(self.DATA)) - def test_sendfile_for_closing_transp(self): srv_proto, cli_proto = self.prepare() cli_proto.transport.close() From 312e2c51a4d8be8d98fdf1975f38f7bea5ac3a9d Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 27 Jan 2018 12:57:49 +0200 Subject: [PATCH 24/31] Update docstring --- Lib/asyncio/base_events.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 7fd797caaa5d00..a35a8f402170a8 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -997,9 +997,27 @@ async def _create_connection_transport( async def sendfile(self, transport, file, offset=0, count=None, *, fallback=True): - """Send a file through a transport. + """Send a file to transport. - Return amount of sent bytes. + Return the total number of bytes which were sent. + + The method uses high-performance os.sendfile if available. + + file must be a regular file object opened in binary mode. + + offset tells from where to start reading the file. If specified, + count is the total number of bytes to transmit as opposed to + sending the file until EOF is reached. File position is updated on + return or also in case of error in which case file.tell() + can be used to figure out the number of bytes + which were sent. + + fallback set to True makes asyncio to manually read and send + the file when the platform does not support the sendfile syscall + (e.g. Windows or SSL socket on Unix). + + Raise SendfileNotAvailableError if the system does not support + sendfile syscall and fallback is False. """ if transport.is_closing(): raise RuntimeError("Transport is closing") From cdc898ddf4ecc828e145442bcf0952de22d593e7 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 27 Jan 2018 13:05:48 +0200 Subject: [PATCH 25/31] Simplify code --- Lib/asyncio/selector_events.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 2ec4524b0e54f1..5956f2d993e5e1 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -544,8 +544,7 @@ async def _sendfile_native(self, transp, file, offset, count): del self._transports[transp._sock_fd] resume_reading = transp.is_reading() transp.pause_reading() - fut = transp._make_empty_waiter() - await fut + await transp._make_empty_waiter() try: return await self.sock_sendfile(transp._sock, file, offset, count, fallback=False) From df556f2090c12025ee167e2d0d1efb7b6ec261a4 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 27 Jan 2018 13:47:37 +0200 Subject: [PATCH 26/31] Mark proactor socket transport as sendfile-fallback --- Lib/asyncio/proactor_events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index ab1285b7999a3c..ca9b2995ba9abc 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -345,6 +345,8 @@ class _ProactorSocketTransport(_ProactorReadPipeTransport, transports.Transport): """Transport for connected sockets.""" + _sendfile_compatible = constants._SendfileMode.FALLBACK + def _set_extra(self, sock): self._extra['socket'] = sock From 5a9d05657409a7e38321c20fc70ff7a77795d594 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 27 Jan 2018 17:40:01 +0200 Subject: [PATCH 27/31] Cleanup more resources in tests --- Lib/test/test_asyncio/test_base_events.py | 7 +++++++ Lib/test/test_asyncio/test_events.py | 4 +++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 436a0eec2dab36..ab6560c70b9031 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1799,9 +1799,11 @@ def __init__(self, loop): self.closed = False self.data = bytearray() self.fut = loop.create_future() + self.transport = None def connection_made(self, transport): self.started = True + self.transport = transport def data_received(self, data): self.data.extend(data) @@ -1809,6 +1811,7 @@ def data_received(self, data): def connection_lost(self, exc): self.closed = True self.fut.set_result(None) + self.transport = None async def wait_closed(self): await self.fut @@ -1853,6 +1856,10 @@ def prepare(self): def cleanup(): server.close() self.run_loop(server.wait_closed()) + sock.close() + if proto.transport is not None: + proto.transport.close() + self.run_loop(proto.wait_closed()) self.addCleanup(cleanup) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index c1241366692aa4..c18897f98722cb 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2151,7 +2151,7 @@ def prepare(self, *, is_ssl=False, close_after=0): # reduce send socket buffer size to test on relative small data sets cli_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) cli_sock.connect((support.HOST, port)) - cli_proto = MySendfileProto() + cli_proto = MySendfileProto(loop=self.loop) tr, pr = self.run_loop(self.loop.create_connection( lambda: cli_proto, sock=cli_sock, ssl=cli_ctx, server_hostname=server_hostname)) @@ -2159,6 +2159,8 @@ def prepare(self, *, is_ssl=False, close_after=0): def cleanup(): srv_proto.transport.close() cli_proto.transport.close() + self.run_loop(srv_proto.done) + self.run_loop(cli_proto.done) server.close() self.run_loop(server.wait_closed()) From 7f99184dc57ebde9cb42b63fec269ea82abb62a9 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 27 Jan 2018 17:48:31 +0200 Subject: [PATCH 28/31] Cleanup more resources in tests --- Lib/test/test_asyncio/test_events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index c18897f98722cb..f19c65c2f00144 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2186,6 +2186,7 @@ def test_sendfile(self): srv_proto, cli_proto = self.prepare() ret = self.run_loop( self.loop.sendfile(cli_proto.transport, self.file)) + import pdb;pdb.set_trace() cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, len(self.DATA)) From b8f2c1ae7371efb819e4ef1541f4a1b024065e2a Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 27 Jan 2018 18:01:12 +0200 Subject: [PATCH 29/31] Fix a bug in reading from not ready future --- Lib/asyncio/proactor_events.py | 4 +++- Lib/test/test_asyncio/test_events.py | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index ca9b2995ba9abc..f99f1558eb133d 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -180,7 +180,9 @@ def _loop_reading(self, fut=None): assert self._read_fut is fut or (self._read_fut is None and self._closing) self._read_fut = None - data = fut.result() # deliver data later in "finally" clause + if fut.done(): + # deliver data later in "finally" clause + data = fut.result() if self._closing: # since close() has been called we ignore any read data diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index f19c65c2f00144..c18897f98722cb 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2186,7 +2186,6 @@ def test_sendfile(self): srv_proto, cli_proto = self.prepare() ret = self.run_loop( self.loop.sendfile(cli_proto.transport, self.file)) - import pdb;pdb.set_trace() cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(ret, len(self.DATA)) From 4224bde73121443c0257460d237d271080e50587 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 27 Jan 2018 20:11:25 +0200 Subject: [PATCH 30/31] Fix proactor support --- Lib/asyncio/base_events.py | 2 ++ Lib/asyncio/proactor_events.py | 3 +++ Lib/asyncio/windows_events.py | 9 ++++++--- Lib/test/test_asyncio/test_events.py | 10 ++++++++-- Modules/overlapped.c | 1 + 5 files changed, 20 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index a35a8f402170a8..f532dc42132700 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -174,6 +174,8 @@ def __init__(self, transp): self._write_ready_fut = None async def drain(self): + if self._transport.is_closing(): + raise ConnectionError("Connection closed by peer") fut = self._write_ready_fut if fut is None: return diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index f99f1558eb133d..6d27e532387577 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -183,6 +183,9 @@ def _loop_reading(self, fut=None): if fut.done(): # deliver data later in "finally" clause data = fut.result() + else: + # the future will be replaced by next proactor.recv call + fut.cancel() if self._closing: # since close() has been called we ignore any read data diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 890fce8b4050c5..f91fcddb2aad32 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -425,7 +425,8 @@ def finish_recv(trans, key, ov): try: return ov.getresult() except OSError as exc: - if exc.winerror == _overlapped.ERROR_NETNAME_DELETED: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): raise ConnectionResetError(*exc.args) else: raise @@ -447,7 +448,8 @@ def finish_recv(trans, key, ov): try: return ov.getresult() except OSError as exc: - if exc.winerror == _overlapped.ERROR_NETNAME_DELETED: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): raise ConnectionResetError(*exc.args) else: raise @@ -466,7 +468,8 @@ def finish_send(trans, key, ov): try: return ov.getresult() except OSError as exc: - if exc.winerror == _overlapped.ERROR_NETNAME_DELETED: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): raise ConnectionResetError(*exc.args) else: raise diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index c18897f98722cb..091a2670c66b79 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2168,6 +2168,7 @@ def cleanup(): self.addCleanup(cleanup) return srv_proto, cli_proto + @unittest.skipIf(sys.platform == 'win32', "UDP sockets are not supported") def test_sendfile_not_supported(self): tr, pr = self.run_loop( self.loop.create_datagram_endpoint( @@ -2213,6 +2214,8 @@ def sendfile_native(transp, file, offset, count): self.assertEqual(self.file.tell(), len(self.DATA)) def test_sendfile_force_unsupported_native(self): + if isinstance(self.loop, asyncio.ProactorEventLoop): + self.skipTest("Fails on proactor event loop") srv_proto, cli_proto = self.prepare() def sendfile_native(transp, file, offset, count): @@ -2227,6 +2230,7 @@ def sendfile_native(transp, file, offset, count): self.run_loop( self.loop.sendfile(cli_proto.transport, self.file, fallback=False)) + cli_proto.transport.close() self.run_loop(srv_proto.done) self.assertEqual(srv_proto.nbytes, 0) @@ -2326,7 +2330,7 @@ def test_sendfile_ssl_close_peer_after_receiving(self): def test_sendfile_close_peer_in_middle_of_receiving(self): srv_proto, cli_proto = self.prepare(close_after=1024) - with self.assertRaises(ConnectionResetError): + with self.assertRaises(ConnectionError): self.run_loop( self.loop.sendfile(cli_proto.transport, self.file)) self.run_loop(srv_proto.done) @@ -2346,7 +2350,7 @@ def sendfile_native(transp, file, offset, count): self.loop._sendfile_native = sendfile_native srv_proto, cli_proto = self.prepare(close_after=1024) - with self.assertRaises(ConnectionResetError): + with self.assertRaises(ConnectionError): self.run_loop( self.loop.sendfile(cli_proto.transport, self.file)) self.run_loop(srv_proto.done) @@ -2356,6 +2360,8 @@ def sendfile_native(transp, file, offset, count): self.assertTrue(1024 <= self.file.tell() < len(self.DATA), self.file.tell()) + @unittest.skipIf(not hasattr(os, 'sendfile'), + "Don't have native sendfile support") def test_sendfile_prevents_bare_write(self): srv_proto, cli_proto = self.prepare() fut = self.loop.create_future() diff --git a/Modules/overlapped.c b/Modules/overlapped.c index e66e856684054c..447a337fdd1fc3 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -1436,6 +1436,7 @@ PyInit__overlapped(void) WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING); WINAPI_CONSTANT(F_DWORD, ERROR_NETNAME_DELETED); + WINAPI_CONSTANT(F_DWORD, ERROR_OPERATION_ABORTED); WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); WINAPI_CONSTANT(F_DWORD, INFINITE); From 3975b5ab5760a478758d9a684211f3bdf87409ff Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 27 Jan 2018 20:32:39 +0200 Subject: [PATCH 31/31] Skip windows specific tests on Unix --- Lib/test/test_asyncio/test_events.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 091a2670c66b79..087fb290da203b 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2214,8 +2214,9 @@ def sendfile_native(transp, file, offset, count): self.assertEqual(self.file.tell(), len(self.DATA)) def test_sendfile_force_unsupported_native(self): - if isinstance(self.loop, asyncio.ProactorEventLoop): - self.skipTest("Fails on proactor event loop") + if sys.platform == 'win32': + if isinstance(self.loop, asyncio.ProactorEventLoop): + self.skipTest("Fails on proactor event loop") srv_proto, cli_proto = self.prepare() def sendfile_native(transp, file, offset, count):