Skip to content

bpo-33097: Fix submit accepting callable after executor shutdown by interpreter exit#6144

Merged
pitrou merged 8 commits intopython:masterfrom
mrknmc:fix-submit-after-interpreter-shutdown
Apr 10, 2018
Merged

bpo-33097: Fix submit accepting callable after executor shutdown by interpreter exit#6144
pitrou merged 8 commits intopython:masterfrom
mrknmc:fix-submit-after-interpreter-shutdown

Conversation

@mrknmc
Copy link
Copy Markdown
Contributor

@mrknmc mrknmc commented Mar 18, 2018

@the-knights-who-say-ni
Copy link
Copy Markdown

Hello, and thanks for your contribution!

I'm a bot set up to make sure that the project can legally accept your contribution by verifying you have signed the PSF contributor agreement (CLA).

Unfortunately we couldn't find an account corresponding to your GitHub username on bugs.python.org (b.p.o) to verify you have signed the CLA (this might be simply due to a missing "GitHub Name" entry in your b.p.o account settings). This is necessary for legal reasons before we can look at your contribution. Please follow the steps outlined in the CPython devguide to rectify this issue.

Thanks again to your contribution and we look forward to looking at it!

@mrknmc mrknmc force-pushed the fix-submit-after-interpreter-shutdown branch 2 times, most recently from a454482 to 60a1efa Compare March 18, 2018 17:03
@mrknmc mrknmc force-pushed the fix-submit-after-interpreter-shutdown branch from 60a1efa to d7f7d45 Compare March 18, 2018 17:04
@mrknmc mrknmc changed the title bpo-33097: Fix issue where submit accepts new tasks after interpreter shutdown bpo-33097: Fix submit accepting callable after executor shutdown by interpreter exit Mar 18, 2018
…nterpreter exit. (pythonGH-6144)

Executors in concurrent.futures accepted tasks after executor was
shutdown by interpreter exit. Tasks were left in PENDING state forever.
This fix changes submit to instead raise a RuntimeError.
@mrknmc mrknmc force-pushed the fix-submit-after-interpreter-shutdown branch from d7f7d45 to b1133c8 Compare March 18, 2018 17:28
Copy link
Copy Markdown
Contributor

@tomMoral tomMoral left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spotting this! Here are a few comments on your patch.

This behavior is caused by the fact that when the interpreter is shutting down, the executor is not flagged as shutdown. For ProcessPoolExecutor, in L428, we do not set the _shutdown_thread flag as we consider that no job can be submitted after we detect that the executor is shutting down. We can thus set _shutdown_thread=True when the executor exists at this point to have a proper state in the executor.

The same inconsistency in the flag exists for ThreadPoolExecutor and fixing both would be a nice addition to this PR.

On the other hand, if the executor has not been started before the interpreter exit, a submit in atexit would launch processes and the cleaning process tends to be messy. So adding a check on global interpreter shutdown at submit avoids this messy state.

Comment thread Lib/test/test_concurrent_futures.py Outdated
@atexit.register
def run_last():
try:
t.submit(lambda: None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lambda expression are not picklable so this should be changed to id, None to avoid adding complexity.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this to id, None in 10fb3c0

Comment thread Lib/concurrent/futures/process.py Outdated
if self._broken:
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
if _global_shutdown or self._shutdown_thread:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add an explicit RuntimeError stating that the submit failed because the interpreter is shutting down.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to an explicit RuntimeError in 881924f5bb37a38529ed2768d0227a5c528b45f0

else:
from multiprocessing import get_context
context = get_context(context)
t = {executor_type}(5, mp_context=context)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the executor is not started before the call in atexit. This causes another set of problem as the sub{Process,Thread} are not started and cleaned properly. For your proposed test, I would add a t.submit(id, 42).result() to be sure that the executor has been started before the atexit calls.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added t.submit(id, 42).result() to the test in d5c28194a0b842833ac4729f5f333ce718120201

@tomMoral
Copy link
Copy Markdown
Contributor

tomMoral commented Mar 24, 2018

For the record, this script causes a deadlock:

import atexit
from multiprocessing import util
from multiprocessing import get_context

util.log_to_stderr(5)

executor = None

@atexit.register
def run_last():
    global executor
    if executor:
        f = executor.submit(id, None)
        util.debug("running future on executor flag {}\n\n\n".format(
            executor._shutdown_thread))
        f.result()


from concurrent.futures import ProcessPoolExecutor
if __name__ == "__main__":
    context = get_context('spawn')
    executor = ProcessPoolExecutor(5, mp_context=context)
    executor.submit(id, 42).result()

@mrknmc
Copy link
Copy Markdown
Contributor Author

mrknmc commented Mar 24, 2018

Hi @tomMoral, thanks for taking a look - I added changes as suggested.

Regarding adding executor._shutdown_thread = True on line
428, I ran into issues changing this. When the executor is deleted using del executor, for example in test test_del_shutdown (test.test_concurrent_futures.ProcessPoolForkProcessPoolShutdownTest), the queue management thread no longer has a reference to it - executor_reference returns None.

Do you think we should check it's not None and still set _shutdown_thread to True as follows?

        if shutting_down():
            try:
                # Since no new work items can be added, it is safe to shutdown
                # this thread if there are no pending work items.
                if not pending_work_items:
                    shutdown_worker()
                    # start of change
                    if executor is not None:
                        executor._shutdown_thread = True
                    # end of change
                    return
            except Full:
                # This is not a problem: we will eventually be woken up (in
                # result_queue.get()) and be able to send a sentinel again.
                pass
        executor = None

I am not sure how to fix this inconsistency in ThreadPoolExecutor since there is no running management thread that could set the flag and there is no way to reference the executor from _python_exit. One option would be to set the flag from the worker threads on line 91 but that doesn't feel like the right place.

@tomMoral
Copy link
Copy Markdown
Contributor

tomMoral commented Mar 24, 2018

Great for the changes.

For the _shutdown_thread, I would move it up, to avoid having new tasks submitted as soon as possible:

if shutting_down():
            try:
+              # Flag the executor as shutting down as early as possible if it is
+              # not gc-ed yet.
+               if executor is not None:
+                   executor._shutdown_thread = True
                # Since no new work items can be added, it is safe to shutdown
                # this thread if there are no pending work items.
                if not pending_work_items:
                    shutdown_worker()
                    return
            except Full:
                # This is not a problem: we will eventually be woken up (in
                # result_queue.get()) and be able to send a sentinel again.
                pass
        executor = None

For the ThreadPoolExecutor, I do not see a better place than the worker and I don't think it is an issue if it is redundant.

@mrknmc mrknmc force-pushed the fix-submit-after-interpreter-shutdown branch from d5c2819 to a1853a6 Compare March 25, 2018 15:41
@mrknmc
Copy link
Copy Markdown
Contributor Author

mrknmc commented Mar 25, 2018

As suggested we now set _shutdown_thread = True in ProcessPoolExecutor - 546ec80 and _shutdown = True in ThreadPoolExecutor - a1853a6.

Can you think of a way to test this behaviour or do you think that might not be necessary?

Copy link
Copy Markdown
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR and thanks @tomMoral for reviewing! I added two small comments below.

Comment thread Lib/test/test_concurrent_futures.py Outdated
from concurrent.futures import {executor_type}
if __name__ == "__main__":
context = '{context}'
if context == "":
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can write if not context:.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to if not context: in 6b78fc2

Comment thread Lib/test/test_concurrent_futures.py Outdated
context=getattr(self, "ctx", "")))
# Errors in atexit hooks don't change the process exit code, check
# stderr manually.
self.assertTrue(err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you could write:

self.assertIn("RuntimeError: cannot schedule new futures", err.decode())

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in 2ba6378

@bedevere-bot
Copy link
Copy Markdown

A Python core developer has requested some changes be made to your pull request before we can consider merging it. If you could please address their requests along with any other requests in other reviews from core developers that would be appreciated.

Once you have made the requested changes, please leave a comment on this pull request containing the phrase I have made the requested changes; please review again. I will then notify any core developers who have left a review that you're ready for them to take another look at this pull request.

@pitrou
Copy link
Copy Markdown
Member

pitrou commented Apr 8, 2018

As a side note, I don't know whether we should backport this to 3.6, given the minor chance that it might cause incompatibilites with existing code (who knows?). 3.7 should be ok.

@pitrou
Copy link
Copy Markdown
Member

pitrou commented Apr 8, 2018

Oh, and @mrknmc, could you sign the contributor's agreement as explained above? That will be required before merging your contribution :-)

test_submit_after_interpreter_shutdown
@mrknmc
Copy link
Copy Markdown
Contributor Author

mrknmc commented Apr 9, 2018

Hi @pitrou,

Thanks for looking over this! I made the suggested changes.

I also signed the agreement again. I think maybe the first time I did it I didn't have my github username on bugs.python.org

@pitrou
Copy link
Copy Markdown
Member

pitrou commented Apr 10, 2018

Thanks @mrknmc. I've pinged the PSF about the contributor agreement, perhaps it got lost somewhere.

@pitrou
Copy link
Copy Markdown
Member

pitrou commented Apr 10, 2018

Ok, the PSF got your contributor agreement!

@pitrou pitrou merged commit c4b695f into python:master Apr 10, 2018
@miss-islington
Copy link
Copy Markdown
Contributor

Thanks @mrknmc for the PR, and @pitrou for merging it 🌮🎉.. I'm working now to backport this PR to: 3.7.
🐍🍒⛏🤖

@bedevere-bot
Copy link
Copy Markdown

GH-6445 is a backport of this pull request to the 3.7 branch.

miss-islington pushed a commit to miss-islington/cpython that referenced this pull request Apr 10, 2018
…nterpreter exit (pythonGH-6144)

Executors in concurrent.futures accepted tasks after executor was shutdown by interpreter exit. Tasks were left in PENDING state forever. This fix changes submit to instead raise a RuntimeError.
(cherry picked from commit c4b695f)

Co-authored-by: Mark Nemec <[email protected]>
pitrou pushed a commit that referenced this pull request Apr 10, 2018
…nterpreter exit (GH-6144) (GH-6445)

Executors in concurrent.futures accepted tasks after executor was shutdown by interpreter exit. Tasks were left in PENDING state forever. This fix changes submit to instead raise a RuntimeError.
(cherry picked from commit c4b695f)

Co-authored-by: Mark Nemec <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants