Python Process Poller

This is a convenience class which allows multiple subprocess to be forked, and collects output from them as they run. The run_command() method should be passed a command-line in list form as well as unique context value which will be used to identify the command later (this can be any hashable value).

The A sample BufferedProcPoller implementation is also provided which simply buffers all output from each process in a 2-tuple of (stdout, stderr) in the output dictionary of the class, whose keys are the context provided to run_command(). It's expected that most applications will need to provide their own derived versions for more robust behaviour.

The return codes of the processes are stored in the results attribute, and are None until the processes terminate. At this point, the handle_terminate() method is called, which can be overridden by derived classes to implement specific behaviour if required.

It should be safe to raise exceptions in either of the handle_X() methods and these will be propogated straight of the poll() method, interrupting any other processing, but I haven't tested this much.

This code requires at least Python 2.6 (to the best of my knowledge) and also assumes a POSIX system - on Windows I believe one must use threads to collect output instead of select.poll().

procpoller.py
import os
import select
import signal
import subprocess
import time
 
class ProcPoller(object):
    """Watches multiple processes for output on stdout and stderr."""
 
    def __init__(self):
        self.results = {}
        self.__fd_map = {}
        self.__procs = {}
        self.__closed_procs = set()
        self.__poller = select.poll()
 
 
    def __del__(self):
        for context, proc_list in self.__procs.iteritems():
            try:
                proc_list[0].kill()
                proc_list[0].wait()
            except Exception:
                pass
 
 
    def run_command(self, cmdline, context, cmd_input=None):
        """Executes the specified command-line.
 
        By default, stdin of the process is not monitored and will remain
        attached to the controlling terminal. Passing a string for
        cmd_input will cause stdin to be attached to a pipe instead
        and the specified input to be sent to the process. Passing an
        empty string allows attaching stdin without sending any initial
        input. In either case, passing further input can be done later
        with send_input(), but if cmd_input was originally passed as None
        this will fail with an exception.
        """
 
        if context in self.__procs:
            raise ValueError("duplicate context value supplied")
        popen_args = {}
        if cmd_input is not None:
            popen_args["stdin"] = subprocess.PIPE
        proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE, close_fds=True,
                                **popen_args)
        try:
            proc.__context = context
 
            self.results[context] = None
            self.__procs[context] = [proc, None if cmd_input is None else ""]
            self.handle_create(self, context)
 
            for fd in (i.fileno() for i in (proc.stdout, proc.stderr)):
                self.__fd_map[fd] = proc
                self.__poller.register(fd, select.POLLIN | select.POLLHUP)
 
            if cmd_input is not None:
                self.__poller.register(proc.stdin.fileno(), 0)
                self.__fd_map[proc.stdin.fileno()] = proc
                self.send_input(context, cmd_input)
        except:
            # In case of problems, make sure we terminate the subprocess, but
            # make sure the exception we re-throw isn't replaced in case the
            # cleanup code throws its own exception.
            try:
                raise
            finally:
                try:
                    proc.kill()
                    proc.wait()
                except:
                    pass
 
 
    def send_input(self, context, cmd_input):
        """Send further output to an active process."""
 
        if context not in self.__procs:
            raise KeyError("context not found")
        if self.__procs[context][1] is None:
            raise ValueError("specified process not open for input")
        old_input = bool(self.__procs[context][1])
        self.__procs[context][1] += cmd_input
        if not old_input and cmd_input:
            self.__poller.modify(self.__procs[context][0].stdin.fileno(),
                                 select.POLLOUT)
 
 
    def send_signal(self, context, sig=signal.SIGTERM):
        """Send signal to specified process."""
 
        if context not in self.__procs:
            raise KeyError("context not found")
        self.__procs[context][0].send_signal(sig)
 
 
    def poll(self, timeout=None):
        """Collect output from processes until stopped or timeout (in secs)."""
 
        poll_timeout = timeout * 1000 if timeout is not None else None
        timeout = time.time() + timeout if timeout is not None else None
 
        while self.__fd_map or self.__closed_procs:
 
            # While there are processes who have closed file descriptors but
            # not yet terminated, check their status at least every 500ms.
            if self.__closed_procs:
                poll_timeout = min(poll_timeout, 500)
 
            for fd, events in self.__poller.poll(poll_timeout):
                proc = self.__fd_map[fd]
                if events & select.POLLOUT:
                    proc_list = self.__procs[proc.__context]
                    if proc_list[1]:
                        sent = os.write(fd, proc_list[1])
                        proc_list[1] = proc_list[1][sent:]
                    if not proc_list[1]:
                        self.__poller.modify(fd, 0)
 
                if events & select.POLLIN:
                    data = os.read(fd, 4096)
                    if data:
                        self.handle_output(self, proc.__context,
                                           fd == proc.stderr.fileno(), data)
                if events & select.POLLHUP:
                    self.__poller.unregister(fd)
                    del self.__fd_map[fd]
                    if not proc.stdout.closed and proc.stdout.fileno() == fd:
                        proc.stdout.close()
                    elif not proc.stderr.closed and proc.stderr.fileno() == fd:
                        proc.stderr.close()
                    if proc.stdout.closed and proc.stderr.closed:
                        if self.__procs[proc.__context][1] is not None:
                            self.__poller.unregister(proc.stdin.fileno())
                            del self.__fd_map[proc.stdin.fileno()]
                            proc.stdin.close()
                        self.__closed_procs.add(proc)
 
            dead_procs = set()
            try:
                for proc in self.__closed_procs:
                    ret = proc.poll()
                    if ret is not None:
                        self.results[proc.__context] = ret
                        dead_procs.add(proc)
                        self.handle_terminate(self, proc.__context)
            finally:
                self.__closed_procs -= dead_procs
 
            if timeout is not None:
                now = time.time()
                if now >= timeout:
                    break
                poll_timeout = (timeout - now) * 1000
 
 
    def handle_output(self, poller, context, is_stderr, data):
        """Derived classes can override to intercept output."""
        pass
 
 
    def handle_terminate(self, poller, context):
        """Derived classes can override to detect terminate."""
        pass
 
 
    def handle_create(self, poller, context):
        """Derived classes can override to hook in to creation."""
        pass
 
 
 
class BufferingProcPoller(ProcPoller):
    """A sample ProcPoller implementation which simply buffers output."""
 
    def __init__(self):
        """Initialise output buffer dict."""
 
        super(BufferingProcPoller, self).__init__()
        self.output = {}
 
 
    def handle_create(self, poller, context):
        """Add empty output buffer entries."""
 
        self.output[context] = ["", ""]        
 
 
    def handle_output(self, poller, context, is_stderr, data):
        """Buffer up output."""
 
        index = 1 if is_stderr else 0
        self.output[context][index] += data
        return False