User Tools

Site Tools


notes:python_process_poller

This is an old revision of the document!


Python Process Poller

This is a convenience class which allows multiple subprocess to be forked, and collects output from them as they run. The run_command() method should be passed a command-line in list form as well as unique context value which will be used to identify the command later (this can be any hashable value).

The base class implementation simply buffers all output from each process in a 2-tuple of (stdout, stderr) in the output dictionary of the class, whose keys are the context provided to run_command(). The handle_output() method can be overridden by derived classes to modify this behaviour, but bear in mind that output won't be populated unless the base class version is called.

The return codes of the processes are stored in the results attribute, and are None until the processes terminate.

procpoller.py
#!/usr/bin/python
 
import os
import select
import signal
import subprocess
import time
 
 
class ProcPoller(object):
    """Watches multiple processes for output on stdout and stderr."""
 
    def __init__(self):
        self.fd_map = {}
        self.results = {}
        self.output = {}
        self.poller = select.poll()
 
 
    def run_command(self, cmdline, context):
        """Executes the specified command-line."""
 
        if context in self.results:
            raise ValueError("duplicate context value supplied")
        proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE, close_fds=True)
        proc.__context = context
        self.results[context] = None
        self.output[context] = ["", ""]
 
        for fd in (i.fileno() for i in (proc.stdout, proc.stderr)):
            self.fd_map[fd] = proc
            self.poller.register(fd, select.POLLIN | select.POLLHUP)
 
 
    def poll(self, timeout=None):
        """Collect output from processes until stopped or timeout (in secs)."""
 
        poll_timeout = timeout * 1000 if timeout is not None else None
        timeout = time.time() + timeout if timeout is not None else None
 
        while self.fd_map:
            dead_procs = set()
 
            for fd, events in self.poller.poll(poll_timeout):
                proc = self.fd_map[fd]
                if events & select.POLLIN:
                    if (self.handle_output(proc.__context,
                                           fd == proc.stderr.fileno(),
                                           os.read(fd, 4096))):
                        proc.send_signal(signal.SIGTERM)
                if events & select.POLLHUP:
                    self.handle_terminate(proc.__context)
                    dead_procs.add(proc)
 
            for proc in dead_procs:
                self.results[proc.__context] = proc.wait()
                out, err = proc.stdout.fileno(), proc.stderr.fileno()
                for i in out, err:
                    self.poller.unregister(i)
                    del self.fd_map[i]
 
            if timeout is not None:
                now = time.time()
                if now >= timeout:
                    break
                poll_timeout = (timeout - now) * 1000
 
 
    def handle_output(self, context, stderr, data):
        """Derived classes can override to intercept output.
 
        Returning True from this function will send SIGTERM to the process.
        """
 
        index = 1 if stderr else 0
        self.output[context][index] += data
        return False
 
 
    def handle_terminate(self, context):
        """Derived classes can override to detect terminate."""
 
        pass
notes/python_process_poller.1360771931.txt.gz · Last modified: 2013/02/13 16:12 by andy