From 602d0722e2cfcab7dfe8b3259a4f0309067a3628 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 3 Nov 2017 00:30:25 +0100 Subject: [PATCH 1/4] ENH add introspection API for concurrent.futures Executor --- Doc/library/concurrent.futures.rst | 67 +++++++ Lib/concurrent/futures/_base.py | 40 ++++ Lib/concurrent/futures/process.py | 52 +++++- Lib/concurrent/futures/thread.py | 118 +++++++----- Lib/test/test_concurrent_futures.py | 172 +++++++++++++++++- .../2017-11-09-15-36-01.bpo-22281.cA7VRH.rst | 1 + 6 files changed, 399 insertions(+), 51 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2017-11-09-15-36-01.bpo-22281.cA7VRH.rst diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 707d24dc2529cc..838997e283827e 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -94,6 +94,73 @@ Executor Objects e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt') + The following :class:`Executor` methods are meant for use to introspect the + state of the :class:`Executor` and the tasks processed by it. + + .. method:: worker_count() + + Return the actual number of workers in the executor. + + .. versionchanged:: 3.7 + Added the *worker_count* method. + + .. method:: active_worker_count() + + Return the number of workers currently running a task in the executor. + + .. versionchanged:: 3.7 + Added the *active_worker_count* method. + + .. method:: idle_worker_count() + + Return the number of workers currently waiting for a new task to be + submitted to the executor. + + .. versionchanged:: 3.7 + Added the *idle_worker_count* method. + + .. method:: task_count() + + Return the number of task pending for the executor. + + .. versionchanged:: 3.7 + Added the *task_count* method. + + .. method:: active_task_count() + + Return the number of task which are currently being processed by the + executor. + + .. versionchanged:: 3.7 + Added the *active_task_count* method. + + .. method:: waiting_task_count() + + Return the number of task waiting to be processed by the executor. + + .. versionchanged:: 3.7 + Added the *waiting_task_count* method. + + .. method:: active_tasks() + + Return a dictionary with WorkItems representing the tasks which are + currently being processed by the executor. The WorkItem object is a + container holding the function *fn*, its arguments *args* and *kwargs* + and the associated :class:`Future` in *future*. + + .. versionchanged:: 3.7 + Added the *active_tasks* method. + + .. method:: waiting_tasks() + + Return a dictionary with WorkItems representing the tasks waiting to be + processed by the executor. The WorkItem object is a container holding + the function *fn*, its arguments *args* and *kwargs* and the associated + :class:`Future` in *future*. + + .. versionchanged:: 3.7 + Added the *waiting_tasks* method. + ThreadPoolExecutor ------------------ diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 4f22f7ee0e6d0a..e7a2c7db3ccd7b 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -306,6 +306,22 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): done.update(waiter.finished_futures) return DoneAndNotDoneFutures(done, set(fs) - done) + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def __str__(self): + return repr(self) + + def __repr__(self): + return "".format(self.fn, self.args, + self.kwargs) + + class Future(object): """Represents the result of an asynchronous computation.""" @@ -604,6 +620,30 @@ def shutdown(self, wait=True): """ pass + def worker_count(self): + raise NotImplementedError() + + def active_worker_count(self): + raise NotImplementedError() + + def idle_worker_count(self): + raise NotImplementedError() + + def task_count(self): + raise NotImplementedError() + + def active_task_count(self): + raise NotImplementedError() + + def waiting_task_count(self): + raise NotImplementedError() + + def active_tasks(self): + raise NotImplementedError() + + def waiting_tasks(self): + raise NotImplementedError() + def __enter__(self): return self diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index aaa5151e017c0f..c6d6695e93b66a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -128,12 +128,6 @@ def _rebuild_exc(exc, tb): exc.__cause__ = _RemoteTraceback(tb) return exc -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs class _ResultItem(object): def __init__(self, work_id, exception=None, result=None): @@ -168,6 +162,11 @@ def _on_queue_feeder_error(self, e, obj): super()._on_queue_feeder_error(e, obj) +class _WorkId(object): + def __init__(self, work_id): + self.work_id = work_id + + def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ it = zip(*iterables) @@ -226,6 +225,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs): # Wake up queue management thread result_queue.put(os.getpid()) return + + # Notify the executor that the job work_id is being processed + result_queue.put(_WorkId(call_item.work_id)) try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: @@ -280,6 +282,7 @@ def _add_call_item_to_queue(pending_work_items, def _queue_management_worker(executor_reference, processes, pending_work_items, + active_work_items, work_ids_queue, call_queue, result_queue, @@ -296,6 +299,8 @@ def _queue_management_worker(executor_reference, workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + active_work_items: A dict mapping work ids to _WorkItems being run e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). call_queue: A ctx.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. @@ -400,6 +405,8 @@ def shutdown_worker(): if not processes: shutdown_worker() return + elif isinstance(result_item, _WorkId): + active_work_items.add(result_item.work_id) elif result_item is not None: work_item = pending_work_items.pop(result_item.work_id, None) # work_item can be None if another process terminated (see above) @@ -410,6 +417,8 @@ def shutdown_worker(): work_item.future.set_result(result_item.result) # Delete references to object. See issue16284 del work_item + + active_work_items.remove(result_item.work_id) # Delete reference to result_item del result_item @@ -525,6 +534,7 @@ def __init__(self, max_workers=None, mp_context=None, self._broken = False self._queue_count = 0 self._pending_work_items = {} + self._active_work_items = set() # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to @@ -566,6 +576,7 @@ def weakref_cb(_, args=(weakref.ref(self, weakref_cb), self._processes, self._pending_work_items, + self._active_work_items, self._work_ids, self._call_queue, self._result_queue, @@ -595,7 +606,7 @@ def submit(self, fn, *args, **kwargs): raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) + w = _base._WorkItem(f, fn, args, kwargs) self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) @@ -656,4 +667,31 @@ def shutdown(self, wait=True): self._processes = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ + def worker_count(self): + return len(self._processes) + + def active_worker_count(self): + return self.active_task_count() + + def idle_worker_count(self): + return self.worker_count() - self.active_worker_count() + + def task_count(self): + return len(self._pending_work_items) + + def active_task_count(self): + return len(self._active_work_items) + + def waiting_task_count(self): + return self.task_count() - self.active_task_count() + + def active_tasks(self): + return {self._pending_work_items[t] for t in self._active_work_items} + + def waiting_tasks(self): + tasks = [v for k, v in self._pending_work_items.items() + if k not in self._active_work_items] + return tasks + + atexit.register(_python_exit) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 2e7100bc3529d4..9470f03cbe9fc5 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -41,13 +41,7 @@ def _python_exit(): atexit.register(_python_exit) - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs +class _WorkItem(_base._WorkItem): def run(self): if not self.future.set_running_or_notify_cancel(): @@ -63,36 +57,48 @@ def run(self): self.future.set_result(result) -def _worker(executor_reference, work_queue, initializer, initargs): - if initializer is not None: +class _Worker(threading.Thread): + """Worker Thread for ThreadPoolExecutor --used for introspection""" + def __init__(self, executor_reference, work_queue, initializer, initargs, + name): + super().__init__(name=name) + self._executor_reference = executor_reference + self._work_queue = work_queue + self._initializer = initializer + self._initargs = initargs + self._work_item = None + + def run(self): + if self._initializer is not None: + try: + self._initializer(*self._initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', + exc_info=True) + executor = self._executor_reference() + if executor is not None: + executor._initializer_failed() + return try: - initializer(*initargs) + while True: + self._work_item = self._work_queue.get(block=True) + if self._work_item is not None: + self._work_item.run() + # Delete references to object. See issue16284 + self._work_item = None + continue + executor = self._executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Notice other workers + self._work_queue.put(None) + return + del executor except BaseException: - _base.LOGGER.critical('Exception in initializer:', exc_info=True) - executor = executor_reference() - if executor is not None: - executor._initializer_failed() - return - try: - while True: - work_item = work_queue.get(block=True) - if work_item is not None: - work_item.run() - # Delete references to object. See issue16284 - del work_item - continue - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - # Notice other workers - work_queue.put(None) - return - del executor - except BaseException: - _base.LOGGER.critical('Exception in worker', exc_info=True) + _base.LOGGER.critical('Exception in worker', exc_info=True) class BrokenThreadPool(_base.BrokenExecutor): @@ -154,22 +160,48 @@ def submit(self, fn, *args, **kwargs): return f submit.__doc__ = _base.Executor.submit.__doc__ + def worker_count(self): + return len(self._threads) + + def active_worker_count(self): + return self.active_task_count() + + def idle_worker_count(self): + return self.worker_count() - self.active_worker_count() + + def task_count(self): + return self.active_task_count() + self.waiting_task_count() + + def active_task_count(self): + return sum(1 for t in self._threads if t._work_item) + + def waiting_task_count(self): + return self._work_queue.qsize() + + def active_tasks(self): + return set(t._work_item for t in self._threads + if t._work_item) + + def waiting_tasks(self): + active = self.active_tasks() + with self._work_queue.mutex: + return [task for task in self._work_queue.queue + if task not in active] + def _adjust_thread_count(self): # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. + # Create a new thread if we're not at the max, and we + # don't have enough idle threads to handle pending tasks. num_threads = len(self._threads) - if num_threads < self._max_workers: + if (num_threads < self._max_workers and + self.idle_worker_count() < self._work_queue.qsize()): thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) - t = threading.Thread(name=thread_name, target=_worker, - args=(weakref.ref(self, weakref_cb), - self._work_queue, - self._initializer, - self._initargs)) + t = _Worker(weakref.ref(self, weakref_cb), self._work_queue, + self._initializer, self._initargs, name=thread_name) t.daemon = True t.start() self._threads.add(t) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 675cd7ae05e5fc..0499784a9c73c0 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -318,7 +318,6 @@ def test_threads_terminate(self): self.executor.submit(mul, 21, 2) self.executor.submit(mul, 6, 7) self.executor.submit(mul, 3, 14) - self.assertEqual(len(self.executor._threads), 3) self.executor.shutdown() for t in self.executor._threads: t.join() @@ -953,6 +952,177 @@ def test_shutdown_deadlock(self): ProcessPoolSpawnMixin)) +def _init_worker(_event): + # Declare a global Event object for the executor's worker. + global event + event = _event + + +def event_wait(): + assert "event" in globals() + t0 = time.time() + event.wait(timeout=30) + # We have to check that the event did not timeout because of Event.wait + # return False when set and clear are called one after the other. + # (See issue31991) + dt = time.time() - t0 + return dt < 30 + + +def event_wait_param(a): + return event_wait() + + +class IntrospectionTests: + initial_patience = 2 + + def setUp(self): + if hasattr(self, 'ctx'): + if sys.platform == "win32" and "fork" in self.ctx: + self.skipTest("require unix system") + context = get_context(self.ctx) + else: + context = threading + self.event = context.Event() + self.executor_kwargs = dict(initializer=_init_worker, + initargs=(self.event,)) + super().setUp() + + def tearDown(self): + self.event.set() + super().tearDown() + + def _wait_for_active(self, expected_count, timeout): + # Because CI can be slow, add a patience mechanism for the task to be + # activated. + deadline = time.time() + timeout + active_task_count = self.executor.active_task_count() + while active_task_count < expected_count and time.time() < deadline: + time.sleep(.1) + active_task_count = self.executor.active_task_count() + + def test_worker_count(self): + self.assertEqual(self.executor.worker_count(), self.worker_count) + out = self.executor.submit(event_wait) + + self._wait_for_active(1, self.initial_patience) + self.assertEqual(self.executor.active_worker_count(), 1) + self.assertEqual(self.executor.idle_worker_count(), + self.worker_count - 1) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(out.result()) + + def test_worker_count_full(self): + fs = [self.executor.submit(event_wait) + for _ in range(self.worker_count + 2)] + + self._wait_for_active(self.worker_count, self.initial_patience) + self.assertEqual(self.executor.active_worker_count(), + self.worker_count) + self.assertEqual(self.executor.idle_worker_count(), 0) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(all([f.result() for f in fs])) + + def test_task_count(self): + self.assertEqual(self.executor.task_count(), 0) + out = self.executor.submit(event_wait) + + self._wait_for_active(1, self.initial_patience) + self.assertEqual(self.executor.task_count(), 1) + self.assertEqual(self.executor.active_task_count(), 1) + self.assertEqual(self.executor.waiting_task_count(), 0) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(out.result()) + + def test_task_count_full(self): + fs = [self.executor.submit(event_wait) + for _ in range(self.worker_count + 2)] + + self._wait_for_active(self.worker_count, self.initial_patience) + self.assertEqual(self.executor.task_count(), self.worker_count + 2) + self.assertEqual(self.executor.active_task_count(), self.worker_count) + self.assertEqual(self.executor.waiting_task_count(), 2) + self.event.set() + self.event.clear() + + # Collect the first `worker_count` results to ensure the active task + # finished and the new active task are started. + self.assertTrue(all([f.result() for f in fs[:self.worker_count]])) + + # Waiting tasks should become active. + self._wait_for_active(2, self.initial_patience) + self.assertEqual(self.executor.active_task_count(), 2) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(all([f.result() for f in fs])) + + def test_task_content(self): + param = 1 + out = self.executor.submit(event_wait_param, param) + + self._wait_for_active(1, self.initial_patience) + tasks = self.executor.active_tasks() + self.assertEqual(len(tasks), 1) + self.assertEqual(tasks.pop().args, (param,)) + self.assertEqual(len(self.executor.waiting_tasks()), 0) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(out.result()) + + def test_task_contents_full(self): + fs = [self.executor.submit(event_wait_param, i) + for i in range(self.worker_count + 2)] + + self._wait_for_active(self.worker_count, self.initial_patience) + active_tasks = self.executor.active_tasks() + waiting_tasks = self.executor.waiting_tasks() + active_args = {item.args for item in active_tasks} + waiting_args = [item.args for item in waiting_tasks] + + # Check lengths. + self.assertEqual(len(active_tasks), self.worker_count) + self.assertEqual(len(waiting_tasks), 2) + + # Make sure right args are there for active tasks + expected_args = {(i,) for i in range(self.worker_count)} + self.assertEqual(active_args, expected_args) + + # Make sure right args in right order for waiting tasks. + expected_args = [(i,) for i in range(self.worker_count, + self.worker_count + 2)] + self.assertEqual(waiting_args, expected_args) + self.event.set() + self.event.clear() + + # Collect the first `worker_count` results to ensure the active task + # finished and the new active task are started. + self.assertTrue(all([f.result() for f in fs[:self.worker_count]])) + + self._wait_for_active(2, self.initial_patience) + # Waiting tasks should become active. + expected_args = set(expected_args) + active_tasks = self.executor.active_tasks() + waiting_tasks = self.executor.waiting_tasks() + active_args = {item.args for item in active_tasks} + self.assertEqual(active_args, expected_args) + self.assertEqual(waiting_tasks, []) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(all([f.result() for f in fs])) + + +create_executor_tests(IntrospectionTests) + + class FutureTests(BaseTestCase): def test_done_callback_with_result(self): callback_result = None diff --git a/Misc/NEWS.d/next/Library/2017-11-09-15-36-01.bpo-22281.cA7VRH.rst b/Misc/NEWS.d/next/Library/2017-11-09-15-36-01.bpo-22281.cA7VRH.rst new file mode 100644 index 00000000000000..9f8d07091816b0 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-11-09-15-36-01.bpo-22281.cA7VRH.rst @@ -0,0 +1 @@ +Add introspection API for :class:`concurrent.futures.Executor` From 3ecc4c61894a6022a869f4154a08a7acd97e3f2b Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 10 Nov 2017 14:14:23 +0100 Subject: [PATCH 2/4] ENH change introspect API to --- Doc/library/concurrent.futures.rst | 90 +++++++++-------------------- Lib/concurrent/futures/_base.py | 23 +------- Lib/concurrent/futures/process.py | 43 ++++++-------- Lib/concurrent/futures/thread.py | 43 +++++--------- Lib/test/test_concurrent_futures.py | 60 +++++++++++-------- 5 files changed, 96 insertions(+), 163 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 838997e283827e..aa233655cfb6df 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -94,72 +94,34 @@ Executor Objects e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt') - The following :class:`Executor` methods are meant for use to introspect the - state of the :class:`Executor` and the tasks processed by it. - - .. method:: worker_count() - - Return the actual number of workers in the executor. - - .. versionchanged:: 3.7 - Added the *worker_count* method. - - .. method:: active_worker_count() - - Return the number of workers currently running a task in the executor. - - .. versionchanged:: 3.7 - Added the *active_worker_count* method. - - .. method:: idle_worker_count() - - Return the number of workers currently waiting for a new task to be - submitted to the executor. - - .. versionchanged:: 3.7 - Added the *idle_worker_count* method. - - .. method:: task_count() - - Return the number of task pending for the executor. - - .. versionchanged:: 3.7 - Added the *task_count* method. - - .. method:: active_task_count() - - Return the number of task which are currently being processed by the - executor. - - .. versionchanged:: 3.7 - Added the *active_task_count* method. - - .. method:: waiting_task_count() - - Return the number of task waiting to be processed by the executor. - - .. versionchanged:: 3.7 - Added the *waiting_task_count* method. - - .. method:: active_tasks() - - Return a dictionary with WorkItems representing the tasks which are - currently being processed by the executor. The WorkItem object is a - container holding the function *fn*, its arguments *args* and *kwargs* - and the associated :class:`Future` in *future*. - - .. versionchanged:: 3.7 - Added the *active_tasks* method. - - .. method:: waiting_tasks() - - Return a dictionary with WorkItems representing the tasks waiting to be - processed by the executor. The WorkItem object is a container holding - the function *fn*, its arguments *args* and *kwargs* and the associated - :class:`Future` in *future*. + .. method:: stat() + + + The following method is meant to introspect the state of the + :class:`Executor` and the tasks processed by it. It returns a dictionary + containing the following keys: + + - `worker_count`: actual number of workers in the executor. + - `active_worker_count`: number of workers currently running a task + in the executor. + - `idle_worker_count`: number of workers currently waiting for a new + task to be submitted to the executor. + - `task_count`: number of task pending for the executor. + - `active_task_count`: number of task which are currently being + processed by the executor. + - `waiting_task_count`: number of task waiting to be processed by + the executor. + - `active_tasks`: a dictionary with WorkItems representing the tasks + which are currently being processed by the executor. The WorkItem + object is a container holding the function *fn*, its arguments + *args* and *kwargs* and the associated :class:`Future` in *future*. + - `waiting_tasks`: a dictionary with WorkItems representing the + tasks waiting to be processed by the executor. The WorkItem object + is a container holding the function *fn*, its arguments *args* and + *kwargs* and the associated :class:`Future` in *future*. .. versionchanged:: 3.7 - Added the *waiting_tasks* method. + Added the *stat* method. ThreadPoolExecutor diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index e7a2c7db3ccd7b..418cd32427be44 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -620,28 +620,7 @@ def shutdown(self, wait=True): """ pass - def worker_count(self): - raise NotImplementedError() - - def active_worker_count(self): - raise NotImplementedError() - - def idle_worker_count(self): - raise NotImplementedError() - - def task_count(self): - raise NotImplementedError() - - def active_task_count(self): - raise NotImplementedError() - - def waiting_task_count(self): - raise NotImplementedError() - - def active_tasks(self): - raise NotImplementedError() - - def waiting_tasks(self): + def stat(self): raise NotImplementedError() def __enter__(self): diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index c6d6695e93b66a..e3f26f6e68012f 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -417,7 +417,7 @@ def shutdown_worker(): work_item.future.set_result(result_item.result) # Delete references to object. See issue16284 del work_item - + active_work_items.remove(result_item.work_id) # Delete reference to result_item del result_item @@ -667,31 +667,22 @@ def shutdown(self, wait=True): self._processes = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ - def worker_count(self): - return len(self._processes) - - def active_worker_count(self): - return self.active_task_count() - - def idle_worker_count(self): - return self.worker_count() - self.active_worker_count() - - def task_count(self): - return len(self._pending_work_items) - - def active_task_count(self): - return len(self._active_work_items) - - def waiting_task_count(self): - return self.task_count() - self.active_task_count() - - def active_tasks(self): - return {self._pending_work_items[t] for t in self._active_work_items} - - def waiting_tasks(self): - tasks = [v for k, v in self._pending_work_items.items() - if k not in self._active_work_items] - return tasks + def stat(self): + active_tasks = {self._pending_work_items[t] + for t in self._active_work_items} + waiting_tasks = [t for t in self._pending_work_items.values() + if t not in active_tasks] + _stat = dict( + worker_count=len(self._processes), + active_worker_count=len(active_tasks), + idle_worker_count=len(self._processes) - len(active_tasks), + task_count=len(active_tasks) + len(waiting_tasks), + active_task_count=len(active_tasks), + waiting_task_count=len(waiting_tasks), + active_tasks=active_tasks, + waiting_tasks=waiting_tasks, + ) + return _stat atexit.register(_python_exit) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 9470f03cbe9fc5..7fd9a8e2bd073f 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -160,33 +160,22 @@ def submit(self, fn, *args, **kwargs): return f submit.__doc__ = _base.Executor.submit.__doc__ - def worker_count(self): - return len(self._threads) - - def active_worker_count(self): - return self.active_task_count() - - def idle_worker_count(self): - return self.worker_count() - self.active_worker_count() - - def task_count(self): - return self.active_task_count() + self.waiting_task_count() - - def active_task_count(self): - return sum(1 for t in self._threads if t._work_item) - - def waiting_task_count(self): - return self._work_queue.qsize() - - def active_tasks(self): - return set(t._work_item for t in self._threads - if t._work_item) - - def waiting_tasks(self): - active = self.active_tasks() + def stat(self): + active_tasks = set(t._work_item for t in self._threads if t._work_item) with self._work_queue.mutex: - return [task for task in self._work_queue.queue - if task not in active] + waiting_tasks = [task for task in self._work_queue.queue + if task not in active_tasks] + _stat = dict( + worker_count=len(self._threads), + active_worker_count=len(active_tasks), + idle_worker_count=len(self._threads) - len(active_tasks), + task_count=len(active_tasks) + len(waiting_tasks), + active_task_count=len(active_tasks), + waiting_task_count=len(waiting_tasks), + active_tasks=active_tasks, + waiting_tasks=waiting_tasks, + ) + return _stat def _adjust_thread_count(self): # When the executor gets lost, the weakref callback will wake up @@ -197,7 +186,7 @@ def weakref_cb(_, q=self._work_queue): # don't have enough idle threads to handle pending tasks. num_threads = len(self._threads) if (num_threads < self._max_workers and - self.idle_worker_count() < self._work_queue.qsize()): + self.stat()["idle_worker_count"] < self._work_queue.qsize()): thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) t = _Worker(weakref.ref(self, weakref_cb), self._work_queue, diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 0499784a9c73c0..3dac2ee841022c 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -960,6 +960,11 @@ def _init_worker(_event): def event_wait(): assert "event" in globals() + + # Wait for event to be clear + while event.is_set(): + time.sleep(.01) + t0 = time.time() event.wait(timeout=30) # We have to check that the event did not timeout because of Event.wait @@ -996,19 +1001,20 @@ def _wait_for_active(self, expected_count, timeout): # Because CI can be slow, add a patience mechanism for the task to be # activated. deadline = time.time() + timeout - active_task_count = self.executor.active_task_count() + active_task_count = self.executor.stat()["active_task_count"] while active_task_count < expected_count and time.time() < deadline: time.sleep(.1) - active_task_count = self.executor.active_task_count() + active_task_count = self.executor.stat()["active_task_count"] def test_worker_count(self): - self.assertEqual(self.executor.worker_count(), self.worker_count) + self.assertEqual(self.executor.stat()["worker_count"], + self.worker_count) out = self.executor.submit(event_wait) self._wait_for_active(1, self.initial_patience) - self.assertEqual(self.executor.active_worker_count(), 1) - self.assertEqual(self.executor.idle_worker_count(), - self.worker_count - 1) + stat = self.executor.stat() + self.assertEqual(stat["active_worker_count"], 1) + self.assertEqual(stat["idle_worker_count"], self.worker_count - 1) self.event.set() # Make sure the test does not pass because of error in workers @@ -1016,25 +1022,26 @@ def test_worker_count(self): def test_worker_count_full(self): fs = [self.executor.submit(event_wait) - for _ in range(self.worker_count + 2)] + for _ in range(self.worker_count)] self._wait_for_active(self.worker_count, self.initial_patience) - self.assertEqual(self.executor.active_worker_count(), - self.worker_count) - self.assertEqual(self.executor.idle_worker_count(), 0) + stat = self.executor.stat() + self.assertEqual(stat['active_worker_count'], self.worker_count) + self.assertEqual(stat['idle_worker_count'], 0) self.event.set() # Make sure the test does not pass because of error in workers self.assertTrue(all([f.result() for f in fs])) def test_task_count(self): - self.assertEqual(self.executor.task_count(), 0) + self.assertEqual(self.executor.stat()['task_count'], 0) out = self.executor.submit(event_wait) self._wait_for_active(1, self.initial_patience) - self.assertEqual(self.executor.task_count(), 1) - self.assertEqual(self.executor.active_task_count(), 1) - self.assertEqual(self.executor.waiting_task_count(), 0) + stat = self.executor.stat() + self.assertEqual(stat['task_count'], 1) + self.assertEqual(stat['active_task_count'], 1) + self.assertEqual(stat['waiting_task_count'], 0) self.event.set() # Make sure the test does not pass because of error in workers @@ -1045,9 +1052,10 @@ def test_task_count_full(self): for _ in range(self.worker_count + 2)] self._wait_for_active(self.worker_count, self.initial_patience) - self.assertEqual(self.executor.task_count(), self.worker_count + 2) - self.assertEqual(self.executor.active_task_count(), self.worker_count) - self.assertEqual(self.executor.waiting_task_count(), 2) + stat = self.executor.stat() + self.assertEqual(stat['task_count'], self.worker_count + 2) + self.assertEqual(stat['active_task_count'], self.worker_count) + self.assertEqual(stat['waiting_task_count'], 2) self.event.set() self.event.clear() @@ -1057,7 +1065,8 @@ def test_task_count_full(self): # Waiting tasks should become active. self._wait_for_active(2, self.initial_patience) - self.assertEqual(self.executor.active_task_count(), 2) + stat = self.executor.stat() + self.assertEqual(stat['active_task_count'], 2) self.event.set() # Make sure the test does not pass because of error in workers @@ -1068,10 +1077,11 @@ def test_task_content(self): out = self.executor.submit(event_wait_param, param) self._wait_for_active(1, self.initial_patience) - tasks = self.executor.active_tasks() + stat = self.executor.stat() + tasks = stat['active_tasks'] self.assertEqual(len(tasks), 1) self.assertEqual(tasks.pop().args, (param,)) - self.assertEqual(len(self.executor.waiting_tasks()), 0) + self.assertEqual(len(stat['waiting_tasks']), 0) self.event.set() # Make sure the test does not pass because of error in workers @@ -1082,8 +1092,9 @@ def test_task_contents_full(self): for i in range(self.worker_count + 2)] self._wait_for_active(self.worker_count, self.initial_patience) - active_tasks = self.executor.active_tasks() - waiting_tasks = self.executor.waiting_tasks() + stat = self.executor.stat() + active_tasks = stat['active_tasks'] + waiting_tasks = stat['waiting_tasks'] active_args = {item.args for item in active_tasks} waiting_args = [item.args for item in waiting_tasks] @@ -1109,8 +1120,9 @@ def test_task_contents_full(self): self._wait_for_active(2, self.initial_patience) # Waiting tasks should become active. expected_args = set(expected_args) - active_tasks = self.executor.active_tasks() - waiting_tasks = self.executor.waiting_tasks() + stat = self.executor.stat() + active_tasks = stat['active_tasks'] + waiting_tasks = stat['waiting_tasks'] active_args = {item.args for item in active_tasks} self.assertEqual(active_args, expected_args) self.assertEqual(waiting_tasks, []) From f2c1e524a3084ba539906262f3189ce36278affd Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Thu, 11 Jan 2018 15:53:34 +0100 Subject: [PATCH 3/4] ENH add status in the introspect API - Use a ExecutorFlags object for more consistent status - Improve the shutdown of the ProcessPoolExecutor --- Lib/concurrent/futures/_base.py | 46 ++++++++++++++++++ Lib/concurrent/futures/process.py | 73 +++++++++++++++++------------ Lib/concurrent/futures/thread.py | 69 +++++++++++++++------------ Lib/test/test_concurrent_futures.py | 45 ++++++++++++++++-- 4 files changed, 170 insertions(+), 63 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 418cd32427be44..c3e988629e01ae 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -635,3 +635,49 @@ class BrokenExecutor(RuntimeError): """ Raised when a executor has become non-functional after a severe failure. """ + + +class _ExecutorFlags(object): + """necessary references to maintain executor states without preventing gc + + It permits to keep the information needed by queue_management_thread + and crash_detection_thread to maintain the pool without preventing the + garbage collection of unreferenced executors. + """ + def __init__(self, exc_class=BrokenExecutor): + + self.started = False + self.shutdown = False + self.shutting_down = False + self.broken = False + self.shutdown_lock = threading.Lock() + self._exc_class = exc_class + + def __enter__(self): + self.shutdown_lock.acquire() + if self.broken: + self.shutdown_lock.release() + raise self._exc_class(self.broken) + if self.shutting_down: + self.shutdown_lock.release() + raise RuntimeError('cannot schedule new futures after shutdown') + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown_lock.release() + + def __repr__(self): + status = "running" if self.started else "not_started" + if self.broken: + status = "broken" + elif self.shutting_down: + status = "shutdown" if self.shutdown else "shutting_down" + return status + + def flag_as_shutting_down(self): + with self.shutdown_lock: + self.shutting_down = True + + def flag_as_broken(self, cause): + with self.shutdown_lock: + self.shutting_down = True + self.broken = cause diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index e3f26f6e68012f..26bd0a5cfbfc44 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -280,6 +280,7 @@ def _add_call_item_to_queue(pending_work_items, def _queue_management_worker(executor_reference, + executor_flags, processes, pending_work_items, active_work_items, @@ -295,6 +296,9 @@ def _queue_management_worker(executor_reference, executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. + executor_flags: A _ExecutorFlags object holding internal states of the + ProcessPoolExecutor. It permits to know if the executor correctly + shutdowned without looking at the thread object. process: A list of the ctx.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. @@ -312,11 +316,16 @@ def _queue_management_worker(executor_reference, """ executor = None - def shutting_down(): + def is_shutting_down(): + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + # If the executor is broken, it should be detected in the next loop. return (_global_shutdown or executor is None - or executor._shutdown_thread) + or executor_flags.shutting_down) - def shutdown_worker(): + def shutdown_all_workers(): # This is an upper bound on the number of children alive. n_children_alive = sum(p.is_alive() for p in processes.values()) n_children_to_stop = n_children_alive @@ -339,6 +348,10 @@ def shutdown_worker(): for p in processes.values(): p.join() + # Flag the executor as shutdown as at this point, all ressources have + # been cleaned up. + executor_flags.shutdown = True + result_reader = result_queue._reader wakeup_reader = thread_wakeup._reader readers = [result_reader, wakeup_reader] @@ -371,13 +384,9 @@ def shutdown_worker(): thread_wakeup.clear() if is_broken: # Mark the process pool broken so that submits fail right now. - executor = executor_reference() - if executor is not None: - executor._broken = ('A child process terminated ' - 'abruptly, the process pool is not ' - 'usable anymore') - executor._shutdown_thread = True - executor = None + executor_flags.flag_as_broken('A child process terminated ' + 'abruptly, the process pool is not ' + 'usable anymore') bpe = BrokenProcessPool("A process in the process pool was " "terminated abruptly while the future was " "running or pending.") @@ -394,16 +403,16 @@ def shutdown_worker(): # locks may be in a dirty state and block forever. for p in processes.values(): p.terminate() - shutdown_worker() + shutdown_all_workers() return if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) - assert shutting_down() + assert is_shutting_down() p = processes.pop(result_item) p.join() if not processes: - shutdown_worker() + shutdown_all_workers() return elif isinstance(result_item, _WorkId): active_work_items.add(result_item.work_id) @@ -428,12 +437,12 @@ def shutdown_worker(): # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - if shutting_down(): + if is_shutting_down(): try: # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: - shutdown_worker() + shutdown_all_workers() return except Full: # This is not a problem: we will eventually be woken up (in @@ -528,10 +537,8 @@ def __init__(self, max_workers=None, mp_context=None, # Map of pids to processes self._processes = {} - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_lock = threading.Lock() - self._broken = False + # Internal variables and flags + self._flags = _base._ExecutorFlags(exc_class=BrokenProcessPool) self._queue_count = 0 self._pending_work_items = {} self._active_work_items = set() @@ -574,6 +581,7 @@ def weakref_cb(_, self._queue_management_thread = threading.Thread( target=_queue_management_worker, args=(weakref.ref(self, weakref_cb), + self._flags, self._processes, self._pending_work_items, self._active_work_items, @@ -584,6 +592,7 @@ def weakref_cb(_, name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() + self._flags.started = True _threads_wakeups[self._queue_management_thread] = \ self._queue_management_thread_wakeup @@ -599,12 +608,9 @@ def _adjust_process_count(self): self._processes[p.pid] = p def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._broken: - raise BrokenProcessPool(self._broken) - if self._shutdown_thread: - raise RuntimeError('cannot schedule new futures after shutdown') - + # The context in _flags asserts that the executor is running or raise + # the correct error. + with self._flags: f = _base.Future() w = _base._WorkItem(f, fn, args, kwargs) @@ -616,6 +622,7 @@ def submit(self, fn, *args, **kwargs): self._start_queue_management_thread() return f + submit.__doc__ = _base.Executor.submit.__doc__ def map(self, fn, *iterables, timeout=None, chunksize=1): @@ -648,18 +655,21 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown_thread = True + self._flags.flag_as_shutting_down() if self._queue_management_thread: # Wake up queue management thread self._queue_management_thread_wakeup.wakeup() if wait: self._queue_management_thread.join() + elif self._call_queue is not None: + # In this case, the executor has not been started so we need to + # manually close the _call_queue. + self._call_queue.close() # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. self._queue_management_thread = None + if self._call_queue is not None: - self._call_queue.close() if wait: self._call_queue.join_thread() self._call_queue = None @@ -672,15 +682,18 @@ def stat(self): for t in self._active_work_items} waiting_tasks = [t for t in self._pending_work_items.values() if t not in active_tasks] + worker_count = len(self._processes) if self._processes else 0 + status = str(self._flags) _stat = dict( - worker_count=len(self._processes), + worker_count=worker_count, active_worker_count=len(active_tasks), - idle_worker_count=len(self._processes) - len(active_tasks), + idle_worker_count=worker_count - len(active_tasks), task_count=len(active_tasks) + len(waiting_tasks), active_task_count=len(active_tasks), waiting_task_count=len(waiting_tasks), active_tasks=active_tasks, waiting_tasks=waiting_tasks, + status=status ) return _stat diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 7fd9a8e2bd073f..b6fc0864410713 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -59,10 +59,11 @@ def run(self): class _Worker(threading.Thread): """Worker Thread for ThreadPoolExecutor --used for introspection""" - def __init__(self, executor_reference, work_queue, initializer, initargs, - name): + def __init__(self, executor_reference, executor_flags, work_queue, + initializer, initargs, name): super().__init__(name=name) self._executor_reference = executor_reference + self._flags = executor_flags self._work_queue = work_queue self._initializer = initializer self._initargs = initargs @@ -92,9 +93,11 @@ def run(self): # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: + if (_shutdown or executor is None + or self._flags.shutting_down): # Notice other workers self._work_queue.put(None) + self._flags.shutdown += 1 return del executor except BaseException: @@ -136,21 +139,16 @@ def __init__(self, max_workers=None, thread_name_prefix='', self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() - self._broken = False - self._shutdown = False - self._shutdown_lock = threading.Lock() + self._flags = _base._ExecutorFlags(exc_class=BrokenThreadPool) self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter())) self._initializer = initializer self._initargs = initargs def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._broken: - raise BrokenThreadPool(self._broken) - - if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') + # The context in _flags asserts that the executor is running or raise + # the correct error. + with self._flags: f = _base.Future() w = _WorkItem(f, fn, args, kwargs) @@ -165,15 +163,23 @@ def stat(self): with self._work_queue.mutex: waiting_tasks = [task for task in self._work_queue.queue if task not in active_tasks] + status = str(self._flags) + if status == "shutdown" and self._flags.shutdown < len(self._threads): + # For ThreadPoolExecutor, we have to verify that all the executor's + # threads have shutdowned to correctly flag the executor as + # shutdown. + status = "shutting_down" + worker_count = len(self._threads) _stat = dict( - worker_count=len(self._threads), + worker_count=worker_count, active_worker_count=len(active_tasks), - idle_worker_count=len(self._threads) - len(active_tasks), + idle_worker_count=worker_count - len(active_tasks), task_count=len(active_tasks) + len(waiting_tasks), active_task_count=len(active_tasks), waiting_task_count=len(waiting_tasks), active_tasks=active_tasks, waiting_tasks=waiting_tasks, + status=status ) return _stat @@ -189,30 +195,33 @@ def weakref_cb(_, q=self._work_queue): self.stat()["idle_worker_count"] < self._work_queue.qsize()): thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) - t = _Worker(weakref.ref(self, weakref_cb), self._work_queue, - self._initializer, self._initargs, name=thread_name) + t = _Worker(weakref.ref(self, weakref_cb), self._flags, + self._work_queue, self._initializer, self._initargs, + name=thread_name) t.daemon = True t.start() + self._flags.started = True self._threads.add(t) _threads_queues[t] = self._work_queue def _initializer_failed(self): - with self._shutdown_lock: - self._broken = ('A thread initializer failed, the thread pool ' - 'is not usable anymore') - # Drain work queue and mark pending futures failed - while True: - try: - work_item = self._work_queue.get_nowait() - except queue.Empty: - break - if work_item is not None: - work_item.future.set_exception(BrokenThreadPool(self._broken)) + self._flags.flag_as_broken('A thread initializer failed, the thread ' + 'pool is not usable anymore') + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.set_exception( + BrokenThreadPool(self._flags.broken)) + # Make sure to wakeup the other threads if + # the failure only ocurred in one of them. + self._work_queue.put(None) def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown = True - self._work_queue.put(None) + self._flags.flag_as_shutting_down() + self._work_queue.put(None) if wait: for t in self._threads: t.join() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 3dac2ee841022c..02ac9d65d63336 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -65,12 +65,15 @@ def init(x): def get_init_status(): return INITIALIZER_STATUS -def init_fail(log_queue=None): +def init_fail(log_queue=None, no_log=False): + logger = logging.getLogger('concurrent.futures') if log_queue is not None: - logger = logging.getLogger('concurrent.futures') logger.addHandler(QueueHandler(log_queue)) logger.setLevel('CRITICAL') logger.propagate = False + elif no_log: + logger.disabled = True + time.sleep(0.1) # let some futures be scheduled raise ValueError('error in initializer') @@ -241,7 +244,7 @@ def test_initializer(self): future.result() # At some point, the executor should break t1 = time.time() - while not self.executor._broken: + while not self.executor._flags.broken: if time.time() - t1 > 5: self.fail("executor not broken after 5 s.") time.sleep(0.01) @@ -1131,6 +1134,42 @@ def test_task_contents_full(self): # Make sure the test does not pass because of error in workers self.assertTrue(all([f.result() for f in fs])) + def test_status(self): + self.event.clear() + fs = self.executor.submit(event_wait) + if hasattr(self, 'ctx'): + executor_join = [self.executor._queue_management_thread] + else: + executor_join = self.executor._threads + self._wait_for_active(1, self.initial_patience) + self.assertEqual(self.executor.stat()["status"], "running") + + self.executor.shutdown(wait=False) + self.assertEqual(self.executor.stat()["status"], "shutting_down") + + self.event.set() + self.assertTrue(fs.result()) + for worker in executor_join: + worker.join() + + self.assertEqual(self.executor.stat()["status"], "shutdown") + + # Test not_started and broken + kwargs = dict(max_workers=2, initializer=init_fail, + initargs=(None, True)) + if hasattr(self, "ctx"): + kwargs["mp_context"] = self.get_context() + + executor = self.executor_type(**kwargs) + self.assertEqual(executor.stat()["status"], "not_started") + + + with self.assertRaises(BrokenExecutor): + executor.submit(id, 0).result() + self.assertEqual(executor.stat()["status"], "broken") + + executor.shutdown() + create_executor_tests(IntrospectionTests) From bd49fa0fb4711b7e250184a1b8058723e960169f Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Thu, 11 Jan 2018 19:13:19 +0100 Subject: [PATCH 4/4] DOC update the doc to contain 'status' --- Doc/library/concurrent.futures.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index aa233655cfb6df..596ed8088123e9 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -115,10 +115,13 @@ Executor Objects which are currently being processed by the executor. The WorkItem object is a container holding the function *fn*, its arguments *args* and *kwargs* and the associated :class:`Future` in *future*. - - `waiting_tasks`: a dictionary with WorkItems representing the + - `waiting_tasks`: a dictionary with WorkItems representing the tasks waiting to be processed by the executor. The WorkItem object is a container holding the function *fn*, its arguments *args* and *kwargs* and the associated :class:`Future` in *future*. + - `status`: a string holding the status of the executor. It can be + one of {"not_started", "running", "broken", "shutting_down", + "shutdown"}. .. versionchanged:: 3.7 Added the *stat* method.