HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Python Executer that kills processes after a timeout

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
afterkillstimeoutthatpythonprocessesexecuter

Problem

Imagine you want to run a bunch of tasks in parallel, but you can't be sure that all of your tasks will eventually terminate or rather decide to eat up all your memory and CPU time. This should be easy, right? Create a new thread, run your task, and kill the thread if it doesn't terminate in a given time slot.

Well, here's how Python fails to handle this:

  • we can't use threads in the first place, because due to the Global Interpreter Lock our tasks won't actually run in parallel, so we won't benefit from multicore CPUs at all. Also, we can't terminate threads in Python, so there's that.



  • multiprocessing: Apparently the way to parallelize stuff in Python is to fork your main process and to handle all the inter-process communication by yourself. Also, communication works by serializing objects using pickle, which comes with a few minor limitations like the inability to serialize functions that are not defined at the top level of a module. But at least we can terminate a process that is running havoc.



  • Process Pools solve a lot of the hassle that comes with inter-process communication, but there is no function that allows us to define a timeout. Bummer.



  • Executors are an option, especially the ProcessPoolExecutor. It has a map() function that allows us to define a timeout after which a task is skipped. skipped doesn't mean that the underlying process gets killed. In fact, the process keeps running until it terminates by itself, which may be never.



So finally I decided to build something that actually solves my problem. The interface is quite similar to Python's Executor class, but it ensures that processes are actually killed after a timeout, at the cost of forking a process for each function call. This means you shouldn't use this class for lightweight tasks or the overhead will considerably slow you down.

```
import os
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Manager, Process
from typing import Callable, Iterable, Dic

Solution

The obligatory other solution solving your life:

def timer(f):
    def wrapper(job_args, *args, **kwargs):
        fn_args, timeout, timeout_callback = job_args[:3]
        q = Queue()
        p = Process(target=f, args=(q, fn_args), kwargs=kwargs)
        p.start()
        p.join(timeout=timeout)
        p.terminate()
        p.join()
        if not q.empty():
            return q.get()
        return timeout_callback(fn_args, args, kwargs)
    return wrapper

@timer
def job(q, file, *args, **kwargs):
    sleep(3)
    print(file, getpid())
    q.put(file+"_done")

def timeout_callback(*args, **kwargs):
    print("Timeout")

def main():
    timeout = 2
    data = ["file1", "file2", "file3", "file4", "file5"]
    tp = ThreadPoolExecutor(2)

    data = [(x, timeout, timeout_callback) for x in data]
    for got in tp.map(job, data):
       print(got)


In your init

def __init__(self, max_workers: int = None):
        ...
        super().__init__()


but you don't have a super class in

class ProcessKillingExecutor:


you could add

class ProcessKillingExecutor(object):


for clarity, else it looks as if you're calling a the super of a base class.

The manager might be unnecessary; you are only ever transferring one value. What you are looking for might be a Queue.

You are only allowed 80 characters, because it becomes more readable

"""
    The ProcessKillingExecutor works like an `Executor `_
    in that it uses a bunch of processes to execute calls to a function with different arguments asynchronously.

    But other than the `ProcessPoolExecutor `_,
    the ProcessKillingExecutor forks a new Process for each function call that terminates after the function returns or
    if a timeout occurs.

    This means that contrary to the Executors and similar classes provided by the Python Standard Library, you can
    rely on the fact that a process will get killed if a timeout occurs and that absolutely no side can occur between
    function calls.

    Note that descendant processes of each process will not be terminated – they will simply become orphaned.
    """


do it like this.

"""
    The ProcessKillingExecutor works like an `Executor
    `_
    in that it uses a bunch of processes to execute calls to a function with
    different arguments asynchronously.

    But other than the `ProcessPoolExecutor
    `_,
    the ProcessKillingExecutor forks a new Process for each function call that
    terminates after the function returns or if a timeout occurs.

    This means that contrary to the Executors and similar classes provided by
    the Python Standard Library, you can rely on the fact that a process will
    get killed if a timeout occurs and that absolutely no side can occur
    between function calls.

    Note that descendant processes of each process will not be terminated –
    they will simply become orphaned.
    """


The exemptions being urls, or in other words, it's good pratice to not line break urls.

Your Annotations is slightly off

def submit(self, func: Callable = None, args: Any = (), kwargs: Dict = {}, timeout: float = None,
               callback_timeout: Callable[[Any], Any] = None, daemon: bool = True):


this should be like this with no whitespace and considering that the parameters will be there:

def submit(self,
           func: Callable,
           fn_args: Any,
           p_kwargs: Dict,
           timeout: float,
           callback_timeout: Callable[[Any], Any],
           daemon: bool):


Encapsulation: You are not working with kwargs and args, so don't name them as such, in the same sense that you should not name your variable i; it is very confusing.

You are dealing with three different args and kwargs, namely the job, the executing process and the classes, and args and kwargs in the class, should belong to the class if needed or not.

params = ({'func': func, 'fn_args': p_args, "p_kwargs": {},
                   'timeout': timeout, 'callback_timeout': callback_timeout,
                   'daemon': daemon} for p_args in iterable)


Terminating and joining the processes of a program.

if p.is_alive():
        p.terminate()
        p.join()


Terminating and joining is not the same thing. Check out your task-manager, you should see the processes staying as "zombies". The call terminate() forces the exit of the process and join() does something I don't pretend to understand, but I know that if you don't call join on terminated processes, you will get zombies.

It's confusing that you use a manager; I get the impression that you are going to use the result to handle interchange of data between but processes, this is not the case. You are just retrieving a result from from a child process.

It is dangerous because you are always overwriting the same key "result" in the manager dict. Think of it. Every time you return from the manager.dict, there can be leakage - i.e. if it is overwritten b

Code Snippets

def timer(f):
    def wrapper(job_args, *args, **kwargs):
        fn_args, timeout, timeout_callback = job_args[:3]
        q = Queue()
        p = Process(target=f, args=(q, fn_args), kwargs=kwargs)
        p.start()
        p.join(timeout=timeout)
        p.terminate()
        p.join()
        if not q.empty():
            return q.get()
        return timeout_callback(fn_args, args, kwargs)
    return wrapper


@timer
def job(q, file, *args, **kwargs):
    sleep(3)
    print(file, getpid())
    q.put(file+"_done")


def timeout_callback(*args, **kwargs):
    print("Timeout")


def main():
    timeout = 2
    data = ["file1", "file2", "file3", "file4", "file5"]
    tp = ThreadPoolExecutor(2)

    data = [(x, timeout, timeout_callback) for x in data]
    for got in tp.map(job, data):
       print(got)
def __init__(self, max_workers: int = None):
        ...
        super().__init__()
class ProcessKillingExecutor:
class ProcessKillingExecutor(object):
"""
    The ProcessKillingExecutor works like an `Executor <https://docs.python.org/dev/library/concurrent.futures.html#executor-objects>`_
    in that it uses a bunch of processes to execute calls to a function with different arguments asynchronously.

    But other than the `ProcessPoolExecutor <https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor>`_,
    the ProcessKillingExecutor forks a new Process for each function call that terminates after the function returns or
    if a timeout occurs.

    This means that contrary to the Executors and similar classes provided by the Python Standard Library, you can
    rely on the fact that a process will get killed if a timeout occurs and that absolutely no side can occur between
    function calls.

    Note that descendant processes of each process will not be terminated – they will simply become orphaned.
    """

Context

StackExchange Code Review Q#142828, answer score: 9

Revisions (0)

No revisions yet.