Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ def shutdown_worker():
# - The executor that owns this worker has been shutdown.
if shutting_down():
try:
# Flag the executor as shutting down as early as possible if it
# is not gc-ed yet.
if executor is not None:
executor._shutdown_thread = True
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
Expand Down Expand Up @@ -595,6 +599,9 @@ def submit(self, fn, *args, **kwargs):
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
if _global_shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')

f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
Expand Down
7 changes: 7 additions & 0 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ def _worker(executor_reference, work_queue, initializer, initargs):
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Flag the executor as shutting down as early as possible if it
# is not gc-ed yet.
if executor is not None:
executor._shutdown = True
# Notice other workers
work_queue.put(None)
return
Expand Down Expand Up @@ -145,6 +149,9 @@ def submit(self, fn, *args, **kwargs):

if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
if _shutdown:
raise RuntimeError('cannot schedule new futures after'
'interpreter shutdown')

f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
Expand Down
28 changes: 28 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,34 @@ def test_interpreter_shutdown(self):
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")

def test_submit_after_interpreter_shutdown(self):
# Test the atexit hook for shutdown of worker threads and processes
rc, out, err = assert_python_ok('-c', """if 1:
import atexit
@atexit.register
def run_last():
try:
t.submit(id, None)
except RuntimeError:
print("runtime-error")
raise
from concurrent.futures import {executor_type}
if __name__ == "__main__":
context = '{context}'
if not context:
t = {executor_type}(5)
else:
from multiprocessing import get_context
context = get_context(context)
t = {executor_type}(5, mp_context=context)
t.submit(id, 42).result()
""".format(executor_type=self.executor_type.__name__,
context=getattr(self, "ctx", "")))
# Errors in atexit hooks don't change the process exit code, check
# stderr manually.
self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
self.assertEqual(out.strip(), b"runtime-error")

def test_hang_issue12364(self):
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
self.executor.shutdown()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Raise RuntimeError when ``executor.submit`` is called during interpreter
shutdown.