-
-
Notifications
You must be signed in to change notification settings - Fork 34.5k
bpo-32622: Implement loop.sendfile() #5271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fa07b35
8ebd388
c7194f7
1632e0a
535be77
bd8c515
fab4669
2f51d6a
d36fcdf
defc61c
2ae8bf5
6b30924
9271eb1
d2bb432
2bf14d3
c1ad2f4
57f02cf
968c8c5
b905d1d
2969cf4
a9af6c7
e5f24b6
bc458b3
e1a173e
032bd40
b466ed3
359181d
1169407
312e2c5
cdc898d
df556f2
5a9d056
7f99184
b8f2c1a
4224bde
3975b5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -540,6 +540,20 @@ def _sock_accept(self, fut, registered, sock): | |
| else: | ||
| fut.set_result((conn, address)) | ||
|
|
||
| async def _sendfile_native(self, transp, file, offset, count): | ||
| del self._transports[transp._sock_fd] | ||
| resume_reading = transp.is_reading() | ||
| transp.pause_reading() | ||
| await transp._make_empty_waiter() | ||
| try: | ||
| return await self.sock_sendfile(transp._sock, file, offset, count, | ||
| fallback=False) | ||
| finally: | ||
| transp._reset_empty_waiter() | ||
| if resume_reading: | ||
| transp.resume_reading() | ||
| self._transports[transp._sock_fd] = transp | ||
|
|
||
| def _process_events(self, event_list): | ||
| for key, mask in event_list: | ||
| fileobj, (reader, writer) = key.fileobj, key.data | ||
|
|
@@ -695,12 +709,14 @@ def get_write_buffer_size(self): | |
| class _SelectorSocketTransport(_SelectorTransport): | ||
|
|
||
| _start_tls_compatible = True | ||
| _sendfile_compatible = constants._SendfileMode.TRY_NATIVE | ||
|
|
||
| def __init__(self, loop, sock, protocol, waiter=None, | ||
| extra=None, server=None): | ||
| super().__init__(loop, sock, protocol, extra, server) | ||
| self._eof = False | ||
| self._paused = False | ||
| self._empty_waiter = None | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This case is still isn't handled.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cancelling is confusing.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whatever, just make sure that if something awaits on |
||
|
|
||
| # Disable the Nagle algorithm -- small writes will be | ||
| # sent without waiting for the TCP ACK. This generally | ||
|
|
@@ -765,6 +781,8 @@ def write(self, data): | |
| f'not {type(data).__name__!r}') | ||
| if self._eof: | ||
| raise RuntimeError('Cannot call write() after write_eof()') | ||
| if self._empty_waiter is not None: | ||
| raise RuntimeError('unable to write; sendfile is in progress') | ||
| if not data: | ||
| return | ||
|
|
||
|
|
@@ -807,12 +825,16 @@ def _write_ready(self): | |
| self._loop._remove_writer(self._sock_fd) | ||
| self._buffer.clear() | ||
| self._fatal_error(exc, 'Fatal write error on socket transport') | ||
| if self._empty_waiter is not None: | ||
| self._empty_waiter.set_exception(exc) | ||
| else: | ||
| if n: | ||
| del self._buffer[:n] | ||
| self._maybe_resume_protocol() # May append to buffer. | ||
| if not self._buffer: | ||
| self._loop._remove_writer(self._sock_fd) | ||
| if self._empty_waiter is not None: | ||
| self._empty_waiter.set_result(None) | ||
| if self._closing: | ||
| self._call_connection_lost(None) | ||
| elif self._eof: | ||
|
|
@@ -828,6 +850,23 @@ def write_eof(self): | |
| def can_write_eof(self): | ||
| return True | ||
|
|
||
| def _call_connection_lost(self, exc): | ||
| super()._call_connection_lost(exc) | ||
| if self._empty_waiter is not None: | ||
| self._empty_waiter.set_exception( | ||
| ConnectionError("Connection is closed by peer")) | ||
|
|
||
| def _make_empty_waiter(self): | ||
| if self._empty_waiter is not None: | ||
| raise RuntimeError("Empty waiter is already set") | ||
| self._empty_waiter = self._loop.create_future() | ||
| if not self._buffer: | ||
| self._empty_waiter.set_result(None) | ||
| return self._empty_waiter | ||
|
|
||
| def _reset_empty_waiter(self): | ||
| self._empty_waiter = None | ||
|
|
||
|
|
||
| class _SelectorDatagramTransport(_SelectorTransport): | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like a wrong place to define an enum. Maybe base_events?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above about circular deps.
sslproto requires the enum, base_events uses it too.
Moreover they do import each other.
It's a bad smell but I don't want to fix imports in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.