diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 707d24dc2529cc..596ed8088123e9 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -94,6 +94,38 @@ Executor Objects e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt') + .. method:: stat() + + + The following method is meant to introspect the state of the + :class:`Executor` and the tasks processed by it. It returns a dictionary + containing the following keys: + + - `worker_count`: actual number of workers in the executor. + - `active_worker_count`: number of workers currently running a task + in the executor. + - `idle_worker_count`: number of workers currently waiting for a new + task to be submitted to the executor. + - `task_count`: number of task pending for the executor. + - `active_task_count`: number of task which are currently being + processed by the executor. + - `waiting_task_count`: number of task waiting to be processed by + the executor. + - `active_tasks`: a dictionary with WorkItems representing the tasks + which are currently being processed by the executor. The WorkItem + object is a container holding the function *fn*, its arguments + *args* and *kwargs* and the associated :class:`Future` in *future*. + - `waiting_tasks`: a dictionary with WorkItems representing the + tasks waiting to be processed by the executor. The WorkItem object + is a container holding the function *fn*, its arguments *args* and + *kwargs* and the associated :class:`Future` in *future*. + - `status`: a string holding the status of the executor. It can be + one of {"not_started", "running", "broken", "shutting_down", + "shutdown"}. + + .. versionchanged:: 3.7 + Added the *stat* method. + ThreadPoolExecutor ------------------ diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 4f22f7ee0e6d0a..c3e988629e01ae 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -306,6 +306,22 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): done.update(waiter.finished_futures) return DoneAndNotDoneFutures(done, set(fs) - done) + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def __str__(self): + return repr(self) + + def __repr__(self): + return "".format(self.fn, self.args, + self.kwargs) + + class Future(object): """Represents the result of an asynchronous computation.""" @@ -604,6 +620,9 @@ def shutdown(self, wait=True): """ pass + def stat(self): + raise NotImplementedError() + def __enter__(self): return self @@ -616,3 +635,49 @@ class BrokenExecutor(RuntimeError): """ Raised when a executor has become non-functional after a severe failure. """ + + +class _ExecutorFlags(object): + """necessary references to maintain executor states without preventing gc + + It permits to keep the information needed by queue_management_thread + and crash_detection_thread to maintain the pool without preventing the + garbage collection of unreferenced executors. + """ + def __init__(self, exc_class=BrokenExecutor): + + self.started = False + self.shutdown = False + self.shutting_down = False + self.broken = False + self.shutdown_lock = threading.Lock() + self._exc_class = exc_class + + def __enter__(self): + self.shutdown_lock.acquire() + if self.broken: + self.shutdown_lock.release() + raise self._exc_class(self.broken) + if self.shutting_down: + self.shutdown_lock.release() + raise RuntimeError('cannot schedule new futures after shutdown') + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown_lock.release() + + def __repr__(self): + status = "running" if self.started else "not_started" + if self.broken: + status = "broken" + elif self.shutting_down: + status = "shutdown" if self.shutdown else "shutting_down" + return status + + def flag_as_shutting_down(self): + with self.shutdown_lock: + self.shutting_down = True + + def flag_as_broken(self, cause): + with self.shutdown_lock: + self.shutting_down = True + self.broken = cause diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index aaa5151e017c0f..26bd0a5cfbfc44 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -128,12 +128,6 @@ def _rebuild_exc(exc, tb): exc.__cause__ = _RemoteTraceback(tb) return exc -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs class _ResultItem(object): def __init__(self, work_id, exception=None, result=None): @@ -168,6 +162,11 @@ def _on_queue_feeder_error(self, e, obj): super()._on_queue_feeder_error(e, obj) +class _WorkId(object): + def __init__(self, work_id): + self.work_id = work_id + + def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ it = zip(*iterables) @@ -226,6 +225,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs): # Wake up queue management thread result_queue.put(os.getpid()) return + + # Notify the executor that the job work_id is being processed + result_queue.put(_WorkId(call_item.work_id)) try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: @@ -278,8 +280,10 @@ def _add_call_item_to_queue(pending_work_items, def _queue_management_worker(executor_reference, + executor_flags, processes, pending_work_items, + active_work_items, work_ids_queue, call_queue, result_queue, @@ -292,10 +296,15 @@ def _queue_management_worker(executor_reference, executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. + executor_flags: A _ExecutorFlags object holding internal states of the + ProcessPoolExecutor. It permits to know if the executor correctly + shutdowned without looking at the thread object. process: A list of the ctx.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + active_work_items: A dict mapping work ids to _WorkItems being run e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). call_queue: A ctx.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. @@ -307,11 +316,16 @@ def _queue_management_worker(executor_reference, """ executor = None - def shutting_down(): + def is_shutting_down(): + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + # If the executor is broken, it should be detected in the next loop. return (_global_shutdown or executor is None - or executor._shutdown_thread) + or executor_flags.shutting_down) - def shutdown_worker(): + def shutdown_all_workers(): # This is an upper bound on the number of children alive. n_children_alive = sum(p.is_alive() for p in processes.values()) n_children_to_stop = n_children_alive @@ -334,6 +348,10 @@ def shutdown_worker(): for p in processes.values(): p.join() + # Flag the executor as shutdown as at this point, all ressources have + # been cleaned up. + executor_flags.shutdown = True + result_reader = result_queue._reader wakeup_reader = thread_wakeup._reader readers = [result_reader, wakeup_reader] @@ -366,13 +384,9 @@ def shutdown_worker(): thread_wakeup.clear() if is_broken: # Mark the process pool broken so that submits fail right now. - executor = executor_reference() - if executor is not None: - executor._broken = ('A child process terminated ' - 'abruptly, the process pool is not ' - 'usable anymore') - executor._shutdown_thread = True - executor = None + executor_flags.flag_as_broken('A child process terminated ' + 'abruptly, the process pool is not ' + 'usable anymore') bpe = BrokenProcessPool("A process in the process pool was " "terminated abruptly while the future was " "running or pending.") @@ -389,17 +403,19 @@ def shutdown_worker(): # locks may be in a dirty state and block forever. for p in processes.values(): p.terminate() - shutdown_worker() + shutdown_all_workers() return if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) - assert shutting_down() + assert is_shutting_down() p = processes.pop(result_item) p.join() if not processes: - shutdown_worker() + shutdown_all_workers() return + elif isinstance(result_item, _WorkId): + active_work_items.add(result_item.work_id) elif result_item is not None: work_item = pending_work_items.pop(result_item.work_id, None) # work_item can be None if another process terminated (see above) @@ -410,6 +426,8 @@ def shutdown_worker(): work_item.future.set_result(result_item.result) # Delete references to object. See issue16284 del work_item + + active_work_items.remove(result_item.work_id) # Delete reference to result_item del result_item @@ -419,12 +437,12 @@ def shutdown_worker(): # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - if shutting_down(): + if is_shutting_down(): try: # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: - shutdown_worker() + shutdown_all_workers() return except Full: # This is not a problem: we will eventually be woken up (in @@ -519,12 +537,11 @@ def __init__(self, max_workers=None, mp_context=None, # Map of pids to processes self._processes = {} - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_lock = threading.Lock() - self._broken = False + # Internal variables and flags + self._flags = _base._ExecutorFlags(exc_class=BrokenProcessPool) self._queue_count = 0 self._pending_work_items = {} + self._active_work_items = set() # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to @@ -564,8 +581,10 @@ def weakref_cb(_, self._queue_management_thread = threading.Thread( target=_queue_management_worker, args=(weakref.ref(self, weakref_cb), + self._flags, self._processes, self._pending_work_items, + self._active_work_items, self._work_ids, self._call_queue, self._result_queue, @@ -573,6 +592,7 @@ def weakref_cb(_, name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() + self._flags.started = True _threads_wakeups[self._queue_management_thread] = \ self._queue_management_thread_wakeup @@ -588,14 +608,11 @@ def _adjust_process_count(self): self._processes[p.pid] = p def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._broken: - raise BrokenProcessPool(self._broken) - if self._shutdown_thread: - raise RuntimeError('cannot schedule new futures after shutdown') - + # The context in _flags asserts that the executor is running or raise + # the correct error. + with self._flags: f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) + w = _base._WorkItem(f, fn, args, kwargs) self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) @@ -605,6 +622,7 @@ def submit(self, fn, *args, **kwargs): self._start_queue_management_thread() return f + submit.__doc__ = _base.Executor.submit.__doc__ def map(self, fn, *iterables, timeout=None, chunksize=1): @@ -637,18 +655,21 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown_thread = True + self._flags.flag_as_shutting_down() if self._queue_management_thread: # Wake up queue management thread self._queue_management_thread_wakeup.wakeup() if wait: self._queue_management_thread.join() + elif self._call_queue is not None: + # In this case, the executor has not been started so we need to + # manually close the _call_queue. + self._call_queue.close() # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. self._queue_management_thread = None + if self._call_queue is not None: - self._call_queue.close() if wait: self._call_queue.join_thread() self._call_queue = None @@ -656,4 +677,25 @@ def shutdown(self, wait=True): self._processes = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ + def stat(self): + active_tasks = {self._pending_work_items[t] + for t in self._active_work_items} + waiting_tasks = [t for t in self._pending_work_items.values() + if t not in active_tasks] + worker_count = len(self._processes) if self._processes else 0 + status = str(self._flags) + _stat = dict( + worker_count=worker_count, + active_worker_count=len(active_tasks), + idle_worker_count=worker_count - len(active_tasks), + task_count=len(active_tasks) + len(waiting_tasks), + active_task_count=len(active_tasks), + waiting_task_count=len(waiting_tasks), + active_tasks=active_tasks, + waiting_tasks=waiting_tasks, + status=status + ) + return _stat + + atexit.register(_python_exit) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 2e7100bc3529d4..b6fc0864410713 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -41,13 +41,7 @@ def _python_exit(): atexit.register(_python_exit) - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs +class _WorkItem(_base._WorkItem): def run(self): if not self.future.set_running_or_notify_cancel(): @@ -63,36 +57,51 @@ def run(self): self.future.set_result(result) -def _worker(executor_reference, work_queue, initializer, initargs): - if initializer is not None: +class _Worker(threading.Thread): + """Worker Thread for ThreadPoolExecutor --used for introspection""" + def __init__(self, executor_reference, executor_flags, work_queue, + initializer, initargs, name): + super().__init__(name=name) + self._executor_reference = executor_reference + self._flags = executor_flags + self._work_queue = work_queue + self._initializer = initializer + self._initargs = initargs + self._work_item = None + + def run(self): + if self._initializer is not None: + try: + self._initializer(*self._initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', + exc_info=True) + executor = self._executor_reference() + if executor is not None: + executor._initializer_failed() + return try: - initializer(*initargs) + while True: + self._work_item = self._work_queue.get(block=True) + if self._work_item is not None: + self._work_item.run() + # Delete references to object. See issue16284 + self._work_item = None + continue + executor = self._executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if (_shutdown or executor is None + or self._flags.shutting_down): + # Notice other workers + self._work_queue.put(None) + self._flags.shutdown += 1 + return + del executor except BaseException: - _base.LOGGER.critical('Exception in initializer:', exc_info=True) - executor = executor_reference() - if executor is not None: - executor._initializer_failed() - return - try: - while True: - work_item = work_queue.get(block=True) - if work_item is not None: - work_item.run() - # Delete references to object. See issue16284 - del work_item - continue - executor = executor_reference() - # Exit if: - # - The interpreter is shutting down OR - # - The executor that owns the worker has been collected OR - # - The executor that owns the worker has been shutdown. - if _shutdown or executor is None or executor._shutdown: - # Notice other workers - work_queue.put(None) - return - del executor - except BaseException: - _base.LOGGER.critical('Exception in worker', exc_info=True) + _base.LOGGER.critical('Exception in worker', exc_info=True) class BrokenThreadPool(_base.BrokenExecutor): @@ -130,21 +139,16 @@ def __init__(self, max_workers=None, thread_name_prefix='', self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() - self._broken = False - self._shutdown = False - self._shutdown_lock = threading.Lock() + self._flags = _base._ExecutorFlags(exc_class=BrokenThreadPool) self._thread_name_prefix = (thread_name_prefix or ("ThreadPoolExecutor-%d" % self._counter())) self._initializer = initializer self._initargs = initargs def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._broken: - raise BrokenThreadPool(self._broken) - - if self._shutdown: - raise RuntimeError('cannot schedule new futures after shutdown') + # The context in _flags asserts that the executor is running or raise + # the correct error. + with self._flags: f = _base.Future() w = _WorkItem(f, fn, args, kwargs) @@ -154,44 +158,70 @@ def submit(self, fn, *args, **kwargs): return f submit.__doc__ = _base.Executor.submit.__doc__ + def stat(self): + active_tasks = set(t._work_item for t in self._threads if t._work_item) + with self._work_queue.mutex: + waiting_tasks = [task for task in self._work_queue.queue + if task not in active_tasks] + status = str(self._flags) + if status == "shutdown" and self._flags.shutdown < len(self._threads): + # For ThreadPoolExecutor, we have to verify that all the executor's + # threads have shutdowned to correctly flag the executor as + # shutdown. + status = "shutting_down" + worker_count = len(self._threads) + _stat = dict( + worker_count=worker_count, + active_worker_count=len(active_tasks), + idle_worker_count=worker_count - len(active_tasks), + task_count=len(active_tasks) + len(waiting_tasks), + active_task_count=len(active_tasks), + waiting_task_count=len(waiting_tasks), + active_tasks=active_tasks, + waiting_tasks=waiting_tasks, + status=status + ) + return _stat + def _adjust_thread_count(self): # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. + # Create a new thread if we're not at the max, and we + # don't have enough idle threads to handle pending tasks. num_threads = len(self._threads) - if num_threads < self._max_workers: + if (num_threads < self._max_workers and + self.stat()["idle_worker_count"] < self._work_queue.qsize()): thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) - t = threading.Thread(name=thread_name, target=_worker, - args=(weakref.ref(self, weakref_cb), - self._work_queue, - self._initializer, - self._initargs)) + t = _Worker(weakref.ref(self, weakref_cb), self._flags, + self._work_queue, self._initializer, self._initargs, + name=thread_name) t.daemon = True t.start() + self._flags.started = True self._threads.add(t) _threads_queues[t] = self._work_queue def _initializer_failed(self): - with self._shutdown_lock: - self._broken = ('A thread initializer failed, the thread pool ' - 'is not usable anymore') - # Drain work queue and mark pending futures failed - while True: - try: - work_item = self._work_queue.get_nowait() - except queue.Empty: - break - if work_item is not None: - work_item.future.set_exception(BrokenThreadPool(self._broken)) + self._flags.flag_as_broken('A thread initializer failed, the thread ' + 'pool is not usable anymore') + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.set_exception( + BrokenThreadPool(self._flags.broken)) + # Make sure to wakeup the other threads if + # the failure only ocurred in one of them. + self._work_queue.put(None) def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown = True - self._work_queue.put(None) + self._flags.flag_as_shutting_down() + self._work_queue.put(None) if wait: for t in self._threads: t.join() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 675cd7ae05e5fc..02ac9d65d63336 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -65,12 +65,15 @@ def init(x): def get_init_status(): return INITIALIZER_STATUS -def init_fail(log_queue=None): +def init_fail(log_queue=None, no_log=False): + logger = logging.getLogger('concurrent.futures') if log_queue is not None: - logger = logging.getLogger('concurrent.futures') logger.addHandler(QueueHandler(log_queue)) logger.setLevel('CRITICAL') logger.propagate = False + elif no_log: + logger.disabled = True + time.sleep(0.1) # let some futures be scheduled raise ValueError('error in initializer') @@ -241,7 +244,7 @@ def test_initializer(self): future.result() # At some point, the executor should break t1 = time.time() - while not self.executor._broken: + while not self.executor._flags.broken: if time.time() - t1 > 5: self.fail("executor not broken after 5 s.") time.sleep(0.01) @@ -318,7 +321,6 @@ def test_threads_terminate(self): self.executor.submit(mul, 21, 2) self.executor.submit(mul, 6, 7) self.executor.submit(mul, 3, 14) - self.assertEqual(len(self.executor._threads), 3) self.executor.shutdown() for t in self.executor._threads: t.join() @@ -953,6 +955,225 @@ def test_shutdown_deadlock(self): ProcessPoolSpawnMixin)) +def _init_worker(_event): + # Declare a global Event object for the executor's worker. + global event + event = _event + + +def event_wait(): + assert "event" in globals() + + # Wait for event to be clear + while event.is_set(): + time.sleep(.01) + + t0 = time.time() + event.wait(timeout=30) + # We have to check that the event did not timeout because of Event.wait + # return False when set and clear are called one after the other. + # (See issue31991) + dt = time.time() - t0 + return dt < 30 + + +def event_wait_param(a): + return event_wait() + + +class IntrospectionTests: + initial_patience = 2 + + def setUp(self): + if hasattr(self, 'ctx'): + if sys.platform == "win32" and "fork" in self.ctx: + self.skipTest("require unix system") + context = get_context(self.ctx) + else: + context = threading + self.event = context.Event() + self.executor_kwargs = dict(initializer=_init_worker, + initargs=(self.event,)) + super().setUp() + + def tearDown(self): + self.event.set() + super().tearDown() + + def _wait_for_active(self, expected_count, timeout): + # Because CI can be slow, add a patience mechanism for the task to be + # activated. + deadline = time.time() + timeout + active_task_count = self.executor.stat()["active_task_count"] + while active_task_count < expected_count and time.time() < deadline: + time.sleep(.1) + active_task_count = self.executor.stat()["active_task_count"] + + def test_worker_count(self): + self.assertEqual(self.executor.stat()["worker_count"], + self.worker_count) + out = self.executor.submit(event_wait) + + self._wait_for_active(1, self.initial_patience) + stat = self.executor.stat() + self.assertEqual(stat["active_worker_count"], 1) + self.assertEqual(stat["idle_worker_count"], self.worker_count - 1) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(out.result()) + + def test_worker_count_full(self): + fs = [self.executor.submit(event_wait) + for _ in range(self.worker_count)] + + self._wait_for_active(self.worker_count, self.initial_patience) + stat = self.executor.stat() + self.assertEqual(stat['active_worker_count'], self.worker_count) + self.assertEqual(stat['idle_worker_count'], 0) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(all([f.result() for f in fs])) + + def test_task_count(self): + self.assertEqual(self.executor.stat()['task_count'], 0) + out = self.executor.submit(event_wait) + + self._wait_for_active(1, self.initial_patience) + stat = self.executor.stat() + self.assertEqual(stat['task_count'], 1) + self.assertEqual(stat['active_task_count'], 1) + self.assertEqual(stat['waiting_task_count'], 0) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(out.result()) + + def test_task_count_full(self): + fs = [self.executor.submit(event_wait) + for _ in range(self.worker_count + 2)] + + self._wait_for_active(self.worker_count, self.initial_patience) + stat = self.executor.stat() + self.assertEqual(stat['task_count'], self.worker_count + 2) + self.assertEqual(stat['active_task_count'], self.worker_count) + self.assertEqual(stat['waiting_task_count'], 2) + self.event.set() + self.event.clear() + + # Collect the first `worker_count` results to ensure the active task + # finished and the new active task are started. + self.assertTrue(all([f.result() for f in fs[:self.worker_count]])) + + # Waiting tasks should become active. + self._wait_for_active(2, self.initial_patience) + stat = self.executor.stat() + self.assertEqual(stat['active_task_count'], 2) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(all([f.result() for f in fs])) + + def test_task_content(self): + param = 1 + out = self.executor.submit(event_wait_param, param) + + self._wait_for_active(1, self.initial_patience) + stat = self.executor.stat() + tasks = stat['active_tasks'] + self.assertEqual(len(tasks), 1) + self.assertEqual(tasks.pop().args, (param,)) + self.assertEqual(len(stat['waiting_tasks']), 0) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(out.result()) + + def test_task_contents_full(self): + fs = [self.executor.submit(event_wait_param, i) + for i in range(self.worker_count + 2)] + + self._wait_for_active(self.worker_count, self.initial_patience) + stat = self.executor.stat() + active_tasks = stat['active_tasks'] + waiting_tasks = stat['waiting_tasks'] + active_args = {item.args for item in active_tasks} + waiting_args = [item.args for item in waiting_tasks] + + # Check lengths. + self.assertEqual(len(active_tasks), self.worker_count) + self.assertEqual(len(waiting_tasks), 2) + + # Make sure right args are there for active tasks + expected_args = {(i,) for i in range(self.worker_count)} + self.assertEqual(active_args, expected_args) + + # Make sure right args in right order for waiting tasks. + expected_args = [(i,) for i in range(self.worker_count, + self.worker_count + 2)] + self.assertEqual(waiting_args, expected_args) + self.event.set() + self.event.clear() + + # Collect the first `worker_count` results to ensure the active task + # finished and the new active task are started. + self.assertTrue(all([f.result() for f in fs[:self.worker_count]])) + + self._wait_for_active(2, self.initial_patience) + # Waiting tasks should become active. + expected_args = set(expected_args) + stat = self.executor.stat() + active_tasks = stat['active_tasks'] + waiting_tasks = stat['waiting_tasks'] + active_args = {item.args for item in active_tasks} + self.assertEqual(active_args, expected_args) + self.assertEqual(waiting_tasks, []) + self.event.set() + + # Make sure the test does not pass because of error in workers + self.assertTrue(all([f.result() for f in fs])) + + def test_status(self): + self.event.clear() + fs = self.executor.submit(event_wait) + if hasattr(self, 'ctx'): + executor_join = [self.executor._queue_management_thread] + else: + executor_join = self.executor._threads + self._wait_for_active(1, self.initial_patience) + self.assertEqual(self.executor.stat()["status"], "running") + + self.executor.shutdown(wait=False) + self.assertEqual(self.executor.stat()["status"], "shutting_down") + + self.event.set() + self.assertTrue(fs.result()) + for worker in executor_join: + worker.join() + + self.assertEqual(self.executor.stat()["status"], "shutdown") + + # Test not_started and broken + kwargs = dict(max_workers=2, initializer=init_fail, + initargs=(None, True)) + if hasattr(self, "ctx"): + kwargs["mp_context"] = self.get_context() + + executor = self.executor_type(**kwargs) + self.assertEqual(executor.stat()["status"], "not_started") + + + with self.assertRaises(BrokenExecutor): + executor.submit(id, 0).result() + self.assertEqual(executor.stat()["status"], "broken") + + executor.shutdown() + + +create_executor_tests(IntrospectionTests) + + class FutureTests(BaseTestCase): def test_done_callback_with_result(self): callback_result = None diff --git a/Misc/NEWS.d/next/Library/2017-11-09-15-36-01.bpo-22281.cA7VRH.rst b/Misc/NEWS.d/next/Library/2017-11-09-15-36-01.bpo-22281.cA7VRH.rst new file mode 100644 index 00000000000000..9f8d07091816b0 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-11-09-15-36-01.bpo-22281.cA7VRH.rst @@ -0,0 +1 @@ +Add introspection API for :class:`concurrent.futures.Executor`