From f5727bdc66d91c56ce8d8e642cf04a0d1be62500 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 17 Dec 2015 19:18:46 -0800 Subject: [PATCH 1/2] Add a terminate() method to ProcessPoolExecutor --- concurrent/futures/process.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/concurrent/futures/process.py b/concurrent/futures/process.py index 665da8e..4afa20a 100644 --- a/concurrent/futures/process.py +++ b/concurrent/futures/process.py @@ -356,4 +356,29 @@ def shutdown(self, wait=True): self._processes = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ + def terminate(self, wait=True): + '''Method to terminate the ProcessPoolExecutor while jobs are in-flight + ''' + with self._shutdown_lock: + self._shutdown_thread = True + # drop pending work items + self._pending_work_items.clear() + # kill all processes + for p in set(self._processes): + p.terminate() + p.join() + self._processes.remove(p) + + if self._queue_management_thread: + # Wake up queue management thread + self._result_queue.put(None) + if wait: + self._queue_management_thread.join() + # To reduce the risk of openning too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._call_queue = None + self._result_queue = None + self._processes = None + atexit.register(_python_exit) From c2ae8dc4722254d1042fdbbcd60712533c5693fd Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 17 Dec 2015 19:19:30 -0800 Subject: [PATCH 2/2] Fix for uncaught KeyboardInterrupt exceptions in ProcessPoolExecutor worker processes Traces would look something like: ``` Traceback (most recent call last): File "/usr/lib64/python2.6/multiprocessing/process.py", line 232, in _bootstrap self.run() File "/usr/lib64/python2.6/multiprocessing/process.py", line 88, in run self._target(*self._args, **self._kwargs) File "/home/thjackso/src/pythonfutures/concurrent/futures/process.py", line 122, in _process_worker call_item = call_queue.get(block=True) File "/usr/lib64/python2.6/multiprocessing/queues.py", line 91, in get res = self._recv() ``` --- concurrent/futures/process.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/concurrent/futures/process.py b/concurrent/futures/process.py index 4afa20a..2ebc13f 100644 --- a/concurrent/futures/process.py +++ b/concurrent/futures/process.py @@ -119,11 +119,16 @@ def _process_worker(call_queue, result_queue): worker that it should exit when call_queue is empty. """ while True: - call_item = call_queue.get(block=True) - if call_item is None: - # Wake up queue management thread - result_queue.put(None) - return + try: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return + # since all child processes get the interrupt as well, we'll just pass + # on the exception and rely on our parent to handle this for us + except KeyboardInterrupt: + continue try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException: