From 660e12b63aed7ed59d7e7ab855ba7bc679c5c766 Mon Sep 17 00:00:00 2001 From: Sean Zimmermann Date: Wed, 18 Apr 2018 17:12:14 -0700 Subject: [PATCH] bpo-24882 - eagerly spawn thread in ThreadPoolExecutor This is an alternate solution to issue 24882. At the suggestion of pitrou, instead of implementing idle thread handling logic, this change modifies ThreadPoolExecutor to spawn all worker threads on creation. --- Lib/concurrent/futures/thread.py | 41 +++++++++---------- Lib/test/test_concurrent_futures.py | 3 +- .../2018-04-18-17-23-46.bpo-24882.3JXIwK.rst | 2 + 3 files changed, 23 insertions(+), 23 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2018-04-18-17-23-46.bpo-24882.3JXIwK.rst diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index b65dee11f72727..57fdf619435552 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -142,6 +142,25 @@ def __init__(self, max_workers=None, thread_name_prefix='', self._initializer = initializer self._initargs = initargs + # eagerly create all worker threads + + # When the executor gets destroyed, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) + for thread_num in range(max_workers): + thread_name = '%s_%d' % (self._thread_name_prefix or self, + thread_num) + t = threading.Thread(name=thread_name, target=_worker, + args=(weakref.ref(self, weakref_cb), + self._work_queue, + self._initializer, + self._initargs)) + t.daemon = True + t.start() + self._threads.add(t) + _threads_queues[t] = self._work_queue + def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._broken: @@ -157,31 +176,9 @@ def submit(self, fn, *args, **kwargs): w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) - self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ - def _adjust_thread_count(self): - # When the executor gets lost, the weakref callback will wake up - # the worker threads. - def weakref_cb(_, q=self._work_queue): - q.put(None) - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. - num_threads = len(self._threads) - if num_threads < self._max_workers: - thread_name = '%s_%d' % (self._thread_name_prefix or self, - num_threads) - t = threading.Thread(name=thread_name, target=_worker, - args=(weakref.ref(self, weakref_cb), - self._work_queue, - self._initializer, - self._initargs)) - t.daemon = True - t.start() - self._threads.add(t) - _threads_queues[t] = self._work_queue - def _initializer_failed(self): with self._shutdown_lock: self._broken = ('A thread initializer failed, the thread pool ' diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index b258a0eafde6d4..259aa537b6f19b 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -346,7 +346,7 @@ def test_threads_terminate(self): self.executor.submit(mul, 21, 2) self.executor.submit(mul, 6, 7) self.executor.submit(mul, 3, 14) - self.assertEqual(len(self.executor._threads), 3) + self.assertEqual(len(self.executor._threads), self.worker_count) self.executor.shutdown() for t in self.executor._threads: t.join() @@ -742,6 +742,7 @@ def test_default_workers(self): executor = self.executor_type() self.assertEqual(executor._max_workers, (os.cpu_count() or 1) * 5) + executor.shutdown() class ProcessPoolExecutorTest(ExecutorTest): diff --git a/Misc/NEWS.d/next/Library/2018-04-18-17-23-46.bpo-24882.3JXIwK.rst b/Misc/NEWS.d/next/Library/2018-04-18-17-23-46.bpo-24882.3JXIwK.rst new file mode 100644 index 00000000000000..4137b975303b86 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-04-18-17-23-46.bpo-24882.3JXIwK.rst @@ -0,0 +1,2 @@ +Modify ThreadPoolExecutor to spawn worker threads on creation, instead of on +work submission.