Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,21 @@ with the result.
Task
----

.. function:: create_task(coro)

Wrap a :ref:`coroutine <coroutine>` *coro* into a task and schedule
its execution. Return the task object.

The task is executed in :func:`get_running_loop` context,
:exc:`RuntimeError` is raised if there is no running loop in
current thread.

.. versionadded:: 3.7

.. class:: Task(coro, \*, loop=None)

Schedule the execution of a :ref:`coroutine <coroutine>`: wrap it in a
future. A task is a subclass of :class:`Future`.
A unit for concurrent running of :ref:`coroutines <coroutine>`,
subclass of :class:`Future`.

A task is responsible for executing a coroutine object in an event loop. If
the wrapped coroutine yields from a future, the task suspends the execution
Expand All @@ -386,7 +397,7 @@ Task
<coroutine>` did not complete. It is probably a bug and a warning is
logged: see :ref:`Pending task destroyed <asyncio-pending-task-destroyed>`.

Don't directly create :class:`Task` instances: use the :func:`ensure_future`
Don't directly create :class:`Task` instances: use the :func:`create_task`
function or the :meth:`AbstractEventLoop.create_task` method.

This class is :ref:`not thread safe <asyncio-multithreading>`.
Expand Down Expand Up @@ -534,9 +545,15 @@ Task functions
.. versionchanged:: 3.5.1
The function accepts any :term:`awaitable` object.

.. note::

:func:`create_task` (added in Python 3.7) is the preferable way
for spawning new tasks.

.. seealso::

The :meth:`AbstractEventLoop.create_task` method.
The :func:`create_task` function and
:meth:`AbstractEventLoop.create_task` method.

.. function:: wrap_future(future, \*, loop=None)

Expand Down
4 changes: 2 additions & 2 deletions Lib/asyncio/base_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import concurrent.futures._base
import reprlib

from . import events
from . import format_helpers

Error = concurrent.futures._base.Error
CancelledError = concurrent.futures.CancelledError
Expand Down Expand Up @@ -38,7 +38,7 @@ def _format_callbacks(cb):
cb = ''

def format_cb(callback):
return events._format_callback_source(callback, ())
return format_helpers._format_callback_source(callback, ())

if size == 1:
cb = format_cb(cb[0])
Expand Down
2 changes: 1 addition & 1 deletion Lib/asyncio/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

# Number of stack entries to capture in debug mode.
# The larger the number, the slower the operation in debug mode
# (see extract_stack() in events.py).
# (see extract_stack() in format_helpers.py).
DEBUG_STACK_DEPTH = 10
10 changes: 5 additions & 5 deletions Lib/asyncio/coroutines.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

from collections.abc import Awaitable, Coroutine

from . import constants
from . import events
from . import base_futures
from . import constants
from . import format_helpers
from .log import logger


Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(self, gen, func=None):
assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
self.gen = gen
self.func = func # Used to unwrap @coroutine decorator
self._source_traceback = events.extract_stack(sys._getframe(1))
self._source_traceback = format_helpers.extract_stack(sys._getframe(1))
self.__name__ = getattr(gen, '__name__', None)
self.__qualname__ = getattr(gen, '__qualname__', None)

Expand Down Expand Up @@ -243,7 +243,7 @@ def _format_coroutine(coro):
func = coro

if coro_name is None:
coro_name = events._format_callback(func, (), {})
coro_name = format_helpers._format_callback(func, (), {})

try:
coro_code = coro.gi_code
Expand All @@ -260,7 +260,7 @@ def _format_coroutine(coro):
if (isinstance(coro, CoroWrapper) and
not inspect.isgeneratorfunction(coro.func) and
coro.func is not None):
source = events._get_function_source(coro.func)
source = format_helpers._get_function_source(coro.func)
if source is not None:
filename, lineno = source
if coro_frame is None:
Expand Down
83 changes: 7 additions & 76 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,86 +11,14 @@
'_get_running_loop',
)

import functools
import inspect
import os
import reprlib
import socket
import subprocess
import sys
import threading
import traceback

from . import constants


def _get_function_source(func):
func = inspect.unwrap(func)
if inspect.isfunction(func):
code = func.__code__
return (code.co_filename, code.co_firstlineno)
if isinstance(func, functools.partial):
return _get_function_source(func.func)
if isinstance(func, functools.partialmethod):
return _get_function_source(func.func)
return None


def _format_args_and_kwargs(args, kwargs):
"""Format function arguments and keyword arguments.

Special case for a single parameter: ('hello',) is formatted as ('hello').
"""
# use reprlib to limit the length of the output
items = []
if args:
items.extend(reprlib.repr(arg) for arg in args)
if kwargs:
items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items())
return '({})'.format(', '.join(items))


def _format_callback(func, args, kwargs, suffix=''):
if isinstance(func, functools.partial):
suffix = _format_args_and_kwargs(args, kwargs) + suffix
return _format_callback(func.func, func.args, func.keywords, suffix)

if hasattr(func, '__qualname__'):
func_repr = getattr(func, '__qualname__')
elif hasattr(func, '__name__'):
func_repr = getattr(func, '__name__')
else:
func_repr = repr(func)

func_repr += _format_args_and_kwargs(args, kwargs)
if suffix:
func_repr += suffix
return func_repr


def _format_callback_source(func, args):
func_repr = _format_callback(func, args, None)
source = _get_function_source(func)
if source:
func_repr += f' at {source[0]}:{source[1]}'
return func_repr


def extract_stack(f=None, limit=None):
"""Replacement for traceback.extract_stack() that only does the
necessary work for asyncio debug mode.
"""
if f is None:
f = sys._getframe().f_back
if limit is None:
# Limit the amount of work to a reasonable amount, as extract_stack()
# can be called for each coroutine and future in debug mode.
limit = constants.DEBUG_STACK_DEPTH
stack = traceback.StackSummary.extract(traceback.walk_stack(f),
limit=limit,
lookup_lines=False)
stack.reverse()
return stack
from . import format_helpers


class Handle:
Expand All @@ -106,7 +34,8 @@ def __init__(self, callback, args, loop):
self._cancelled = False
self._repr = None
if self._loop.get_debug():
self._source_traceback = extract_stack(sys._getframe(1))
self._source_traceback = format_helpers.extract_stack(
sys._getframe(1))
else:
self._source_traceback = None

Expand All @@ -115,7 +44,8 @@ def _repr_info(self):
if self._cancelled:
info.append('cancelled')
if self._callback is not None:
info.append(_format_callback_source(self._callback, self._args))
info.append(format_helpers._format_callback_source(
self._callback, self._args))
if self._source_traceback:
frame = self._source_traceback[-1]
info.append(f'created at {frame[0]}:{frame[1]}')
Expand Down Expand Up @@ -145,7 +75,8 @@ def _run(self):
try:
self._callback(*self._args)
except Exception as exc:
cb = _format_callback_source(self._callback, self._args)
cb = format_helpers._format_callback_source(
self._callback, self._args)
msg = f'Exception in callback {cb}'
context = {
'message': msg,
Expand Down
75 changes: 75 additions & 0 deletions Lib/asyncio/format_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import functools
import inspect
import reprlib
import traceback

from . import constants


def _get_function_source(func):
func = inspect.unwrap(func)
if inspect.isfunction(func):
code = func.__code__
return (code.co_filename, code.co_firstlineno)
if isinstance(func, functools.partial):
return _get_function_source(func.func)
if isinstance(func, functools.partialmethod):
return _get_function_source(func.func)
return None


def _format_callback_source(func, args):
func_repr = _format_callback(func, args, None)
source = _get_function_source(func)
if source:
func_repr += f' at {source[0]}:{source[1]}'
return func_repr


def _format_args_and_kwargs(args, kwargs):
"""Format function arguments and keyword arguments.

Special case for a single parameter: ('hello',) is formatted as ('hello').
"""
# use reprlib to limit the length of the output
items = []
if args:
items.extend(reprlib.repr(arg) for arg in args)
if kwargs:
items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items())
return '({})'.format(', '.join(items))


def _format_callback(func, args, kwargs, suffix=''):
if isinstance(func, functools.partial):
suffix = _format_args_and_kwargs(args, kwargs) + suffix
return _format_callback(func.func, func.args, func.keywords, suffix)

if hasattr(func, '__qualname__'):
func_repr = getattr(func, '__qualname__')
elif hasattr(func, '__name__'):
func_repr = getattr(func, '__name__')
else:
func_repr = repr(func)

func_repr += _format_args_and_kwargs(args, kwargs)
if suffix:
func_repr += suffix
return func_repr


def extract_stack(f=None, limit=None):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep extract_stack in events.py?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. extract_stack is used in coroutines.py which in turn is used by _asynciomodule.c.

"""Replacement for traceback.extract_stack() that only does the
necessary work for asyncio debug mode.
"""
if f is None:
f = sys._getframe().f_back
if limit is None:
# Limit the amount of work to a reasonable amount, as extract_stack()
# can be called for each coroutine and future in debug mode.
limit = constants.DEBUG_STACK_DEPTH
stack = traceback.StackSummary.extract(traceback.walk_stack(f),
limit=limit,
lookup_lines=False)
stack.reverse()
return stack
4 changes: 3 additions & 1 deletion Lib/asyncio/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from . import base_futures
from . import events
from . import format_helpers


CancelledError = base_futures.CancelledError
Expand Down Expand Up @@ -79,7 +80,8 @@ def __init__(self, *, loop=None):
self._loop = loop
self._callbacks = []
if self._loop.get_debug():
self._source_traceback = events.extract_stack(sys._getframe(1))
self._source_traceback = format_helpers.extract_stack(
sys._getframe(1))

_repr_info = base_futures._future_repr_info

Expand Down
23 changes: 19 additions & 4 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Support for tasks, coroutines and the scheduler."""

__all__ = (
'Task',
'Task', 'create_task',
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
Expand Down Expand Up @@ -67,13 +67,19 @@ def all_tasks(cls, loop=None):
return {t for t in cls._all_tasks if t._loop is loop}

def __init__(self, coro, *, loop=None):
assert coroutines.iscoroutine(coro), repr(coro)
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
self._coro = coro
self._fut_waiter = None
if not coroutines.iscoroutine(coro):
# raise after Future.__init__(), attrs are required for __del__
# prevent logging for pending task in __del__
self._log_destroy_pending = False
raise TypeError(f"a coroutine was expected, got {coro!r}")

self._must_cancel = False
self._fut_waiter = None
self._coro = coro

self._loop.call_soon(self._step)
self.__class__._all_tasks.add(self)

Expand Down Expand Up @@ -263,6 +269,15 @@ def _wakeup(self, future):
Task = _CTask = _asyncio.Task


def create_task(coro):
"""Schedule the execution of a coroutine object in a spawn task.

Return a Task object.
"""
loop = events.get_running_loop()
return loop.create_task(coro)


# wait() and as_completed() similar to those in PEP 3148.

FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
Expand Down
Loading