Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@ Future
If the future is already done when this method is called, raises
:exc:`InvalidStateError`.

.. method:: get_loop()

Return the event loop the future object is bound to.

.. versionadded:: 3.7


Example: Future with run_until_complete()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _run_until_complete_cb(fut):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
fut._loop.stop()
futures._get_loop(fut).stop()


class Server(events.AbstractServer):
Expand Down
20 changes: 18 additions & 2 deletions Lib/asyncio/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def __del__(self):
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)

def get_loop(self):
"""Return the event loop the Future is bound to."""
return self._loop

def cancel(self):
"""Cancel the future and schedule callbacks.

Expand Down Expand Up @@ -249,6 +253,18 @@ def __iter__(self):
_PyFuture = Future


def _get_loop(fut):
# Tries to call Future.get_loop() if it's available.
# Otherwise fallbacks to using the old '_loop' property.
try:
get_loop = fut.get_loop
except AttributeError:
pass
else:
return get_loop()
return fut._loop


def _set_result_unless_cancelled(fut, result):
"""Helper setting the result only if the future was not cancelled."""
if fut.cancelled():
Expand Down Expand Up @@ -304,8 +320,8 @@ def _chain_future(source, destination):
if not isfuture(destination) and not isinstance(destination,
concurrent.futures.Future):
raise TypeError('A future is required for destination argument')
source_loop = source._loop if isfuture(source) else None
dest_loop = destination._loop if isfuture(destination) else None
source_loop = _get_loop(source) if isfuture(source) else None
dest_loop = _get_loop(destination) if isfuture(destination) else None

def _set_state(future, other):
if isfuture(future):
Expand Down
39 changes: 17 additions & 22 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def all_tasks(loop=None):
"""Return a set of all tasks for the loop."""
if loop is None:
loop = events.get_event_loop()
return {t for t, l in _all_tasks.items() if l is loop}
return {t for t in _all_tasks if futures._get_loop(t) is loop}


class Task(futures.Future):
Expand Down Expand Up @@ -96,7 +96,7 @@ def __init__(self, coro, *, loop=None):
self._coro = coro

self._loop.call_soon(self._step)
_register_task(self._loop, self)
_register_task(self)

def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending:
Expand Down Expand Up @@ -215,7 +215,7 @@ def _step(self, exc=None):
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# Yielded Future must come from Future.__iter__().
if result._loop is not self._loop:
if futures._get_loop(result) is not self._loop:
new_exc = RuntimeError(
f'Task {self!r} got Future '
f'{result!r} attached to a different loop')
Expand Down Expand Up @@ -510,9 +510,9 @@ async def sleep(delay, result=None, *, loop=None):
if loop is None:
loop = events.get_event_loop()
future = loop.create_future()
h = future._loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
Expand All @@ -525,7 +525,7 @@ def ensure_future(coro_or_future, *, loop=None):
If the argument is a Future, it is returned directly.
"""
if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
Expand Down Expand Up @@ -655,7 +655,7 @@ def _done_callback(fut):
if arg not in arg_to_fut:
fut = ensure_future(arg, loop=loop)
if loop is None:
loop = fut._loop
loop = futures._get_loop(fut)
if fut is not arg:
# 'arg' was not a Future, therefore, 'fut' is a new
# Future created specifically for 'arg'. Since the caller
Expand Down Expand Up @@ -707,7 +707,7 @@ def shield(arg, *, loop=None):
if inner.done():
# Shortcut.
return inner
loop = inner._loop
loop = futures._get_loop(inner)
outer = loop.create_future()

def _done_callback(inner):
Expand Down Expand Up @@ -751,23 +751,17 @@ def callback():
return future


# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
# Task should be a weak reference to remove entry on task garbage
# collection, EventLoop is required
# to not access to private task._loop attribute.
_all_tasks = weakref.WeakKeyDictionary()
# WeakSet containing all alive tasks.
_all_tasks = weakref.WeakSet()

# Dictionary containing tasks that are currently active in
# all running event loops. {EventLoop: Task}
_current_tasks = {}


def _register_task(loop, task):
"""Register a new task in asyncio as executed by loop.

Returns None.
"""
_all_tasks[task] = loop
def _register_task(task):
"""Register a new task in asyncio as executed by loop."""
_all_tasks.add(task)


def _enter_task(loop, task):
Expand All @@ -786,8 +780,9 @@ def _leave_task(loop, task):
del _current_tasks[loop]


def _unregister_task(loop, task):
_all_tasks.pop(task, None)
def _unregister_task(task):
"""Unregister a task."""
_all_tasks.discard(task)


_py_register_task = _register_task
Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_asyncio/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def test_init_constructor_default_loop(self):
asyncio.set_event_loop(self.loop)
f = self._new_future()
self.assertIs(f._loop, self.loop)
self.assertIs(f.get_loop(), self.loop)

def test_constructor_positional(self):
# Make sure Future doesn't accept a positional argument
Expand Down
13 changes: 8 additions & 5 deletions Lib/test/test_asyncio/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def notmuch():
self.assertTrue(t.done())
self.assertEqual(t.result(), 'ok')
self.assertIs(t._loop, self.loop)
self.assertIs(t.get_loop(), self.loop)

loop = asyncio.new_event_loop()
self.set_event_loop(loop)
Expand Down Expand Up @@ -2310,10 +2311,11 @@ class BaseTaskIntrospectionTests:
def test__register_task(self):
task = mock.Mock()
loop = mock.Mock()
task.get_loop = lambda: loop
self.assertEqual(asyncio.all_tasks(loop), set())
self._register_task(loop, task)
self._register_task(task)
self.assertEqual(asyncio.all_tasks(loop), {task})
self._unregister_task(loop, task)
self._unregister_task(task)

def test__enter_task(self):
task = mock.Mock()
Expand Down Expand Up @@ -2360,14 +2362,15 @@ def test__leave_task_failure2(self):
def test__unregister_task(self):
task = mock.Mock()
loop = mock.Mock()
self._register_task(loop, task)
self._unregister_task(loop, task)
task.get_loop = lambda: loop
self._register_task(task)
self._unregister_task(task)
self.assertEqual(asyncio.all_tasks(loop), set())

def test__unregister_task_not_registered(self):
task = mock.Mock()
loop = mock.Mock()
self._unregister_task(loop, task)
self._unregister_task(task)
self.assertEqual(asyncio.all_tasks(loop), set())


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
asyncio: Add Task.get_loop() and Future.get_loop()
Loading