From 755736e6720dbb3289fdf0de45e31be4149f889e Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 8 Jun 2017 21:06:21 +0200 Subject: [PATCH 1/4] Fix bpo-30596: Add close() method to multiprocessing.Process --- Lib/multiprocessing/forkserver.py | 11 ++++- Lib/multiprocessing/popen_fork.py | 7 ++- Lib/multiprocessing/popen_forkserver.py | 2 +- Lib/multiprocessing/popen_spawn_posix.py | 2 +- Lib/multiprocessing/popen_spawn_win32.py | 5 +- Lib/multiprocessing/process.py | 27 +++++++++++ Lib/test/_test_multiprocessing.py | 61 ++++++++++++++++++++++++ 7 files changed, 109 insertions(+), 6 deletions(-) diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 8156dae3b79f85..92f55bf636bbe8 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -208,13 +208,20 @@ def _serve_one(s, listener, alive_r, handlers): semaphore_tracker._semaphore_tracker._fd = stfd # send pid to client processes - write_unsigned(child_w, os.getpid()) + try: + write_unsigned(child_w, os.getpid()) + except BrokenPipeError: + # client died + return 1 # run process object received over pipe code = spawn._main(child_r) # write the exit code to the pipe - write_unsigned(child_w, code) + try: + write_unsigned(child_w, code) + except BrokenPipeError: + return 1 # # Read and write unsigned numbers diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py index 683b52d2271ffa..31afb9e6ca4fa2 100644 --- a/Lib/multiprocessing/popen_fork.py +++ b/Lib/multiprocessing/popen_fork.py @@ -17,6 +17,7 @@ def __init__(self, process_obj): sys.stdout.flush() sys.stderr.flush() self.returncode = None + self.finalizer = None self._launch(process_obj) def duplicate_for_child(self, fd): @@ -73,5 +74,9 @@ def _launch(self, process_obj): os._exit(code) else: os.close(child_w) - util.Finalize(self, os.close, (parent_r,)) + self.finalizer = util.Finalize(self, os.close, (parent_r,)) self.sentinel = parent_r + + def close(self): + if self.finalizer is not None: + self.finalizer() diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py index 222db2d90a3156..b698b8b54d6484 100644 --- a/Lib/multiprocessing/popen_forkserver.py +++ b/Lib/multiprocessing/popen_forkserver.py @@ -49,7 +49,7 @@ def _launch(self, process_obj): set_spawning_popen(None) self.sentinel, w = forkserver.connect_to_new_process(self._fds) - util.Finalize(self, os.close, (self.sentinel,)) + self.finalizer = util.Finalize(self, os.close, (self.sentinel,)) with open(w, 'wb', closefd=True) as f: f.write(buf.getbuffer()) self.pid = forkserver.read_unsigned(self.sentinel) diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index 98f8f0ab334d2f..38151060efa2e3 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -62,7 +62,7 @@ def _launch(self, process_obj): f.write(fp.getbuffer()) finally: if parent_r is not None: - util.Finalize(self, os.close, (parent_r,)) + self.finalizer = util.Finalize(self, os.close, (parent_r,)) for fd in (child_r, child_w, parent_w): if fd is not None: os.close(fd) diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py index 6fd588f542673e..ecb86e96ba4364 100644 --- a/Lib/multiprocessing/popen_spawn_win32.py +++ b/Lib/multiprocessing/popen_spawn_win32.py @@ -56,7 +56,7 @@ def __init__(self, process_obj): self.returncode = None self._handle = hp self.sentinel = int(hp) - util.Finalize(self, _winapi.CloseHandle, (self.sentinel,)) + self.finalizer = util.Finalize(self, _winapi.CloseHandle, (self.sentinel,)) # send information to child set_spawning_popen(self) @@ -96,3 +96,6 @@ def terminate(self): except OSError: if self.wait(timeout=1.0) is None: raise + + def close(self): + self.finalizer() diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 37365f2e42cb69..c1cb6cfa2ad7e4 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -76,6 +76,7 @@ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, self._config = _current_process._config.copy() self._parent_pid = os.getpid() self._popen = None + self._closed = False self._target = target self._args = tuple(args) self._kwargs = dict(kwargs) @@ -85,6 +86,10 @@ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, self.daemon = daemon _dangling.add(self) + def _check_closed(self): + if self._closed: + raise ValueError("process object is closed") + def run(self): ''' Method to be run in sub-process; can be overridden in sub-class @@ -96,6 +101,7 @@ def start(self): ''' Start child process ''' + self._check_closed() assert self._popen is None, 'cannot start a process twice' assert self._parent_pid == os.getpid(), \ 'can only start a process object created by current process' @@ -110,12 +116,14 @@ def terminate(self): ''' Terminate process; sends SIGTERM signal or uses TerminateProcess() ''' + self._check_closed() self._popen.terminate() def join(self, timeout=None): ''' Wait until child process terminates ''' + self._check_closed() assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) @@ -126,6 +134,7 @@ def is_alive(self): ''' Return whether process is alive ''' + self._check_closed() if self is _current_process: return True assert self._parent_pid == os.getpid(), 'can only test a child process' @@ -134,6 +143,20 @@ def is_alive(self): self._popen.poll() return self._popen.returncode is None + def close(self): + ''' + Close the Process object. + + This releases resources held by this object, but does not join or + terminate the child process. + ''' + if self._popen is not None: + self._popen.close() + self._popen = None + del self._sentinel + _children.discard(self) + self._closed = True + @property def name(self): return self._name @@ -295,6 +318,7 @@ def __init__(self): self._name = 'MainProcess' self._parent_pid = None self._popen = None + self._closed = False self._config = {'authkey': AuthenticationString(os.urandom(32)), 'semprefix': '/mp'} # Note that some versions of FreeBSD only allow named @@ -307,6 +331,9 @@ def __init__(self): # Everything in self._config will be inherited by descendant # processes. + def close(self): + pass + _current_process = _MainProcess() _process_counter = itertools.count(1) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index f1f93674935e7a..2f37d7b9cd2272 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -398,6 +398,67 @@ def test_sentinel(self): p.join() self.assertTrue(wait_for_handle(sentinel, timeout=1)) + @classmethod + def _test_close(cls, rc=0, q=None): + if q is not None: + q.get() + sys.exit(rc) + + def test_close(self): + if self.TYPE == "threads": + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + p = self.Process(target=self._test_close) + p.daemon = True + p.start() + p.join() + + self.assertEqual(p.is_alive(), False) + self.assertEqual(p.exitcode, 0) + p.close() + with self.assertRaises(ValueError): + p.is_alive() + with self.assertRaises(ValueError): + p.join() + with self.assertRaises(ValueError): + p.terminate() + p.close() + + wr = weakref.ref(p) + del p + gc.collect() + self.assertIs(wr(), None) + + def test_close_before_end(self): + if self.TYPE == "threads": + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + q = self.Queue() + p = self.Process(target=self._test_close, kwargs={'q': q}) + p.daemon = True + p.start() + + def _cleanup(): + q.put(None) + time.sleep(0.5) # let child die + + self.addCleanup(_cleanup) + + self.assertEqual(p.is_alive(), True) + self.assertIn(p, self.active_children()) + p.close() + self.assertNotIn(p, self.active_children()) + with self.assertRaises(ValueError): + p.is_alive() + with self.assertRaises(ValueError): + p.join() + with self.assertRaises(ValueError): + p.terminate() + + wr = weakref.ref(p) + del p + gc.collect() + self.assertIs(wr(), None) + + # # # From 08254ff342358d2ce79a8e0a5c9657f41e2c5dbc Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 12 Jun 2017 18:51:33 +0200 Subject: [PATCH 2/4] Raise ValueError if close() is called before the Process is finished running --- Lib/multiprocessing/process.py | 7 ++++-- Lib/test/_test_multiprocessing.py | 40 +++++++------------------------ 2 files changed, 13 insertions(+), 34 deletions(-) diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index c1cb6cfa2ad7e4..00a7cdd9ec4d12 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -147,10 +147,13 @@ def close(self): ''' Close the Process object. - This releases resources held by this object, but does not join or - terminate the child process. + This method releases resources held by the Process object. It is + an error to call this method if the child process is still running. ''' if self._popen is not None: + if self._popen.poll() is None: + raise ValueError("Cannot close a process while it is still running. " + "You should first call join() or terminate().") self._popen.close() self._popen = None del self._sentinel diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index e9173fa8800206..dbbdc7c8f70fb2 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -412,11 +412,17 @@ def _test_close(cls, rc=0, q=None): def test_close(self): if self.TYPE == "threads": self.skipTest('test not appropriate for {}'.format(self.TYPE)) - p = self.Process(target=self._test_close) + q = self.Queue() + p = self.Process(target=self._test_close, kwargs={'q': q}) p.daemon = True p.start() - p.join() + self.assertEqual(p.is_alive(), True) + # Child is still alive, cannot close + with self.assertRaises(ValueError): + p.close() + q.put(None) + p.join() self.assertEqual(p.is_alive(), False) self.assertEqual(p.exitcode, 0) p.close() @@ -433,36 +439,6 @@ def test_close(self): gc.collect() self.assertIs(wr(), None) - def test_close_before_end(self): - if self.TYPE == "threads": - self.skipTest('test not appropriate for {}'.format(self.TYPE)) - q = self.Queue() - p = self.Process(target=self._test_close, kwargs={'q': q}) - p.daemon = True - p.start() - - def _cleanup(): - q.put(None) - time.sleep(0.5) # let child die - - self.addCleanup(_cleanup) - - self.assertEqual(p.is_alive(), True) - self.assertIn(p, self.active_children()) - p.close() - self.assertNotIn(p, self.active_children()) - with self.assertRaises(ValueError): - p.is_alive() - with self.assertRaises(ValueError): - p.join() - with self.assertRaises(ValueError): - p.terminate() - - wr = weakref.ref(p) - del p - gc.collect() - self.assertIs(wr(), None) - def test_many_processes(self): if self.TYPE == 'threads': self.skipTest('test not appropriate for {}'.format(self.TYPE)) From d3d07c3f42791fa20f973e6e76ba744b7bb2c7ce Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 12 Jun 2017 19:22:45 +0200 Subject: [PATCH 3/4] Add docs --- Doc/library/multiprocessing.rst | 10 ++++++++++ Lib/multiprocessing/process.py | 5 +++++ 2 files changed, 15 insertions(+) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 6b4a8cbd299e4c..5265639edb975d 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -598,6 +598,16 @@ The :mod:`multiprocessing` package mostly replicates the API of the acquired a lock or semaphore etc. then terminating it is liable to cause other processes to deadlock. + .. method:: close() + + Close the :class:`Process` object, releasing all resources associated + with it. :exc:`ValueError` is raised if the underlying process + is still running. Once :meth:`close` returns successfully, most + other methods and attributes of the :class:`Process` object will + raise :exc:`ValueError`. + + .. versionadded:: 3.7 + Note that the :meth:`start`, :meth:`join`, :meth:`is_alive`, :meth:`terminate` and :attr:`exitcode` methods should only be called by the process that created the process object. diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 00a7cdd9ec4d12..70bb50d99911ca 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -200,6 +200,7 @@ def exitcode(self): ''' Return exit code of process or `None` if it has yet to stop ''' + self._check_closed() if self._popen is None: return self._popen return self._popen.poll() @@ -209,6 +210,7 @@ def ident(self): ''' Return identifier (PID) of process or `None` if it has yet to start ''' + self._check_closed() if self is _current_process: return os.getpid() else: @@ -222,6 +224,7 @@ def sentinel(self): Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination. ''' + self._check_closed() try: return self._sentinel except AttributeError: @@ -230,6 +233,8 @@ def sentinel(self): def __repr__(self): if self is _current_process: status = 'started' + elif self._closed: + status = 'closed' elif self._parent_pid != os.getpid(): status = 'unknown' elif self._popen is None: From 47170891252aaaab62798119c00b3018be0a44ab Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 24 Jun 2017 18:56:29 +0200 Subject: [PATCH 4/4] Add NEWS blurb --- .../NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst diff --git a/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst b/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst new file mode 100644 index 00000000000000..6b9e9a17c484f2 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-06-24-18-55-58.bpo-30596.VhB8iG.rst @@ -0,0 +1 @@ +Add a ``close()`` method to ``multiprocessing.Process``.