diff --git a/concurrent/futures/process.py b/concurrent/futures/process.py index 665da8e..2ebc13f 100644 --- a/concurrent/futures/process.py +++ b/concurrent/futures/process.py @@ -119,11 +119,16 @@ def _process_worker(call_queue, result_queue): worker that it should exit when call_queue is empty. """ while True: - call_item = call_queue.get(block=True) - if call_item is None: - # Wake up queue management thread - result_queue.put(None) - return + try: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return + # since all child processes get the interrupt as well, we'll just pass + # on the exception and rely on our parent to handle this for us + except KeyboardInterrupt: + continue try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException: @@ -356,4 +361,29 @@ def shutdown(self, wait=True): self._processes = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ + def terminate(self, wait=True): + '''Method to terminate the ProcessPoolExecutor while jobs are in-flight + ''' + with self._shutdown_lock: + self._shutdown_thread = True + # drop pending work items + self._pending_work_items.clear() + # kill all processes + for p in set(self._processes): + p.terminate() + p.join() + self._processes.remove(p) + + if self._queue_management_thread: + # Wake up queue management thread + self._result_queue.put(None) + if wait: + self._queue_management_thread.join() + # To reduce the risk of openning too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._call_queue = None + self._result_queue = None + self._processes = None + atexit.register(_python_exit)