Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 19 additions & 22 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,25 @@ def __init__(self, max_workers=None, thread_name_prefix='',
self._initializer = initializer
self._initargs = initargs

# eagerly create all worker threads

# When the executor gets destroyed, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
for thread_num in range(max_workers):
thread_name = '%s_%d' % (self._thread_name_prefix or self,
thread_num)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't the weakref shared among all threads?
If weakref_cb can call q.put(None) max_workers times. Or we can use __del__ method for it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, there is no need to call q.put(None) max_workers times.
First worker received None calls q.put(None) for next worker.

# Notice other workers
work_queue.put(None)

self._work_queue,
self._initializer,
self._initargs))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue

def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
Expand All @@ -157,31 +176,9 @@ def submit(self, fn, *args, **kwargs):
w = _WorkItem(f, fn, args, kwargs)

self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__

def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue

def _initializer_failed(self):
with self._shutdown_lock:
self._broken = ('A thread initializer failed, the thread pool '
Expand Down
3 changes: 2 additions & 1 deletion Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def test_threads_terminate(self):
self.executor.submit(mul, 21, 2)
self.executor.submit(mul, 6, 7)
self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._threads), 3)
self.assertEqual(len(self.executor._threads), self.worker_count)
self.executor.shutdown()
for t in self.executor._threads:
t.join()
Expand Down Expand Up @@ -742,6 +742,7 @@ def test_default_workers(self):
executor = self.executor_type()
self.assertEqual(executor._max_workers,
(os.cpu_count() or 1) * 5)
executor.shutdown()


class ProcessPoolExecutorTest(ExecutorTest):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Modify ThreadPoolExecutor to spawn worker threads on creation, instead of on
work submission.