Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,38 @@ Executor Objects
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

.. method:: stat()


The following method is meant to introspect the state of the
:class:`Executor` and the tasks processed by it. It returns a dictionary
containing the following keys:

- `worker_count`: actual number of workers in the executor.
- `active_worker_count`: number of workers currently running a task
in the executor.
- `idle_worker_count`: number of workers currently waiting for a new
task to be submitted to the executor.
- `task_count`: number of task pending for the executor.
- `active_task_count`: number of task which are currently being
processed by the executor.
- `waiting_task_count`: number of task waiting to be processed by
the executor.
- `active_tasks`: a dictionary with WorkItems representing the tasks
which are currently being processed by the executor. The WorkItem
object is a container holding the function *fn*, its arguments
*args* and *kwargs* and the associated :class:`Future` in *future*.
- `waiting_tasks`: a dictionary with WorkItems representing the
tasks waiting to be processed by the executor. The WorkItem object
is a container holding the function *fn*, its arguments *args* and
*kwargs* and the associated :class:`Future` in *future*.
- `status`: a string holding the status of the executor. It can be
one of {"not_started", "running", "broken", "shutting_down",
"shutdown"}.

.. versionchanged:: 3.7
Added the *stat* method.


ThreadPoolExecutor
------------------
Expand Down
65 changes: 65 additions & 0 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,22 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, set(fs) - done)


class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs

def __str__(self):
return repr(self)

def __repr__(self):
return "<WorkItem: {} args: {} kwargs: {}>".format(self.fn, self.args,
self.kwargs)


class Future(object):
"""Represents the result of an asynchronous computation."""

Expand Down Expand Up @@ -604,6 +620,9 @@ def shutdown(self, wait=True):
"""
pass

def stat(self):
raise NotImplementedError()

def __enter__(self):
return self

Expand All @@ -616,3 +635,49 @@ class BrokenExecutor(RuntimeError):
"""
Raised when a executor has become non-functional after a severe failure.
"""


class _ExecutorFlags(object):
"""necessary references to maintain executor states without preventing gc

It permits to keep the information needed by queue_management_thread
and crash_detection_thread to maintain the pool without preventing the
garbage collection of unreferenced executors.
"""
def __init__(self, exc_class=BrokenExecutor):

self.started = False
self.shutdown = False
self.shutting_down = False
self.broken = False
self.shutdown_lock = threading.Lock()
self._exc_class = exc_class

def __enter__(self):
self.shutdown_lock.acquire()
if self.broken:
self.shutdown_lock.release()
raise self._exc_class(self.broken)
if self.shutting_down:
self.shutdown_lock.release()
raise RuntimeError('cannot schedule new futures after shutdown')

def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown_lock.release()

def __repr__(self):
status = "running" if self.started else "not_started"
if self.broken:
status = "broken"
elif self.shutting_down:
status = "shutdown" if self.shutdown else "shutting_down"
return status

def flag_as_shutting_down(self):
with self.shutdown_lock:
self.shutting_down = True

def flag_as_broken(self, cause):
with self.shutdown_lock:
self.shutting_down = True
self.broken = cause
112 changes: 77 additions & 35 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ def _rebuild_exc(exc, tb):
exc.__cause__ = _RemoteTraceback(tb)
return exc

class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs

class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None):
Expand Down Expand Up @@ -168,6 +162,11 @@ def _on_queue_feeder_error(self, e, obj):
super()._on_queue_feeder_error(e, obj)


class _WorkId(object):
def __init__(self, work_id):
self.work_id = work_id


def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
Expand Down Expand Up @@ -226,6 +225,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
# Wake up queue management thread
result_queue.put(os.getpid())
return

# Notify the executor that the job work_id is being processed
result_queue.put(_WorkId(call_item.work_id))
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
Expand Down Expand Up @@ -278,8 +280,10 @@ def _add_call_item_to_queue(pending_work_items,


def _queue_management_worker(executor_reference,
executor_flags,
processes,
pending_work_items,
active_work_items,
work_ids_queue,
call_queue,
result_queue,
Expand All @@ -292,10 +296,15 @@ def _queue_management_worker(executor_reference,
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
this thread. Used to determine if the ProcessPoolExecutor has been
garbage collected and that this function can exit.
executor_flags: A _ExecutorFlags object holding internal states of the
ProcessPoolExecutor. It permits to know if the executor correctly
shutdowned without looking at the thread object.
process: A list of the ctx.Process instances used as
workers.
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
active_work_items: A dict mapping work ids to _WorkItems being run e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
call_queue: A ctx.Queue that will be filled with _CallItems
derived from _WorkItems for processing by the process workers.
Expand All @@ -307,11 +316,16 @@ def _queue_management_worker(executor_reference,
"""
executor = None

def shutting_down():
def is_shutting_down():
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
# If the executor is broken, it should be detected in the next loop.
return (_global_shutdown or executor is None
or executor._shutdown_thread)
or executor_flags.shutting_down)

def shutdown_worker():
def shutdown_all_workers():
# This is an upper bound on the number of children alive.
n_children_alive = sum(p.is_alive() for p in processes.values())
n_children_to_stop = n_children_alive
Expand All @@ -334,6 +348,10 @@ def shutdown_worker():
for p in processes.values():
p.join()

# Flag the executor as shutdown as at this point, all ressources have
# been cleaned up.
executor_flags.shutdown = True

result_reader = result_queue._reader
wakeup_reader = thread_wakeup._reader
readers = [result_reader, wakeup_reader]
Expand Down Expand Up @@ -366,13 +384,9 @@ def shutdown_worker():
thread_wakeup.clear()
if is_broken:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = ('A child process terminated '
'abruptly, the process pool is not '
'usable anymore')
executor._shutdown_thread = True
executor = None
executor_flags.flag_as_broken('A child process terminated '
'abruptly, the process pool is not '
'usable anymore')
bpe = BrokenProcessPool("A process in the process pool was "
"terminated abruptly while the future was "
"running or pending.")
Expand All @@ -389,17 +403,19 @@ def shutdown_worker():
# locks may be in a dirty state and block forever.
for p in processes.values():
p.terminate()
shutdown_worker()
shutdown_all_workers()
return
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
assert shutting_down()
assert is_shutting_down()
p = processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
shutdown_all_workers()
return
elif isinstance(result_item, _WorkId):
active_work_items.add(result_item.work_id)
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
Expand All @@ -410,6 +426,8 @@ def shutdown_worker():
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item

active_work_items.remove(result_item.work_id)
# Delete reference to result_item
del result_item

Expand All @@ -419,12 +437,12 @@ def shutdown_worker():
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if shutting_down():
if is_shutting_down():
try:
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_worker()
shutdown_all_workers()
return
except Full:
# This is not a problem: we will eventually be woken up (in
Expand Down Expand Up @@ -519,12 +537,11 @@ def __init__(self, max_workers=None, mp_context=None,
# Map of pids to processes
self._processes = {}

# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._broken = False
# Internal variables and flags
self._flags = _base._ExecutorFlags(exc_class=BrokenProcessPool)
self._queue_count = 0
self._pending_work_items = {}
self._active_work_items = set()

# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
Expand Down Expand Up @@ -564,15 +581,18 @@ def weakref_cb(_,
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._flags,
self._processes,
self._pending_work_items,
self._active_work_items,
self._work_ids,
self._call_queue,
self._result_queue,
self._queue_management_thread_wakeup),
name="QueueManagerThread")
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
self._flags.started = True
_threads_wakeups[self._queue_management_thread] = \
self._queue_management_thread_wakeup

Expand All @@ -588,14 +608,11 @@ def _adjust_process_count(self):
self._processes[p.pid] = p

def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')

# The context in _flags asserts that the executor is running or raise
# the correct error.
with self._flags:
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
w = _base._WorkItem(f, fn, args, kwargs)

self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
Expand All @@ -605,6 +622,7 @@ def submit(self, fn, *args, **kwargs):

self._start_queue_management_thread()
return f

submit.__doc__ = _base.Executor.submit.__doc__

def map(self, fn, *iterables, timeout=None, chunksize=1):
Expand Down Expand Up @@ -637,23 +655,47 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
return _chain_from_iterable_of_lists(results)

def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown_thread = True
self._flags.flag_as_shutting_down()
if self._queue_management_thread:
# Wake up queue management thread
self._queue_management_thread_wakeup.wakeup()
if wait:
self._queue_management_thread.join()
elif self._call_queue is not None:
# In this case, the executor has not been started so we need to
# manually close the _call_queue.
self._call_queue.close()
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None

if self._call_queue is not None:
self._call_queue.close()
if wait:
self._call_queue.join_thread()
self._call_queue = None
self._result_queue = None
self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__

def stat(self):
active_tasks = {self._pending_work_items[t]
for t in self._active_work_items}
waiting_tasks = [t for t in self._pending_work_items.values()
if t not in active_tasks]
worker_count = len(self._processes) if self._processes else 0
status = str(self._flags)
_stat = dict(
worker_count=worker_count,
active_worker_count=len(active_tasks),
idle_worker_count=worker_count - len(active_tasks),
task_count=len(active_tasks) + len(waiting_tasks),
active_task_count=len(active_tasks),
waiting_task_count=len(waiting_tasks),
active_tasks=active_tasks,
waiting_tasks=waiting_tasks,
status=status
)
return _stat


atexit.register(_python_exit)
Loading