User Tools

Site Tools


notes:python_process_poller

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
notes:python_process_poller [2013/02/13 16:12]
andy created
notes:python_process_poller [2013/07/22 12:32]
andy
Line 3: Line 3:
 This is a convenience class which allows multiple subprocess to be forked, and collects output from them as they run. The ''​run_command()''​ method should be passed a command-line in ''​list''​ form as well as unique context value which will be used to identify the command later (this can be any hashable value). This is a convenience class which allows multiple subprocess to be forked, and collects output from them as they run. The ''​run_command()''​ method should be passed a command-line in ''​list''​ form as well as unique context value which will be used to identify the command later (this can be any hashable value).
  
-The base class implementation simply buffers all output from each process in a 2-tuple of ''​(stdout,​ stderr)''​ in the ''​output''​ dictionary of the class, whose keys are the context provided to ''​run_command()''​. ​The ''​handle_output()''​ method can be overridden by derived ​classes to modify this behaviour, but bear in mind that ''​output''​ won't be populated unless the base class version is called.+The A sample ''​BufferedProcPoller'' ​implementation ​is also provided which simply buffers all output from each process in a 2-tuple of ''​(stdout,​ stderr)''​ in the ''​output''​ dictionary of the class, whose keys are the context provided to ''​run_command()''​. ​It's expected that most applications will need to provide their own derived ​versions for more robust ​behaviour.
  
-The return codes of the processes are stored in the ''​results''​ attribute, and are ''​None''​ until the processes terminate.+The return codes of the processes are stored in the ''​results''​ attribute, and are ''​None''​ until the processes terminate. At this point, the ''​handle_terminate()''​ method is called, which can be overridden by derived classes to implement specific behaviour if required.
  
 +It should be safe to raise exceptions in either of the ''​handle_X()''​ methods and these will be propogated straight of the ''​poll()''​ method, interrupting any other processing, but I haven'​t tested this much.
  
-<​code ​python procpoller.py> +<note>​This ​code requires at least Python 2.6 (to the best of my knowledge) and also assumes a POSIX system - on Windows I believe one must use threads to collect output instead of ''​select.poll()''​.<​/note>
-#!/usr/​bin/​python+
  
 +
 +<code python procpoller.py>​
 import os import os
 import select import select
Line 16: Line 18:
 import subprocess import subprocess
 import time import time
- 
  
 class ProcPoller(object):​ class ProcPoller(object):​
Line 22: Line 23:
  
     def __init__(self):​     def __init__(self):​
-        self.fd_map = {} 
         self.results = {}         self.results = {}
-        self.output ​= {} +        self.__fd_map ​= {} 
-        self.poller ​= select.poll()+        self.__procs = {} 
 +        self.__closed_procs = set() 
 +        self.__poller ​= select.poll() 
 +         
 +         
 +    def __del__(self):​ 
 +        for context, proc_list in self.__procs.iteritems():​ 
 +            try: 
 +                proc_list[0].kill() 
 +                proc_list[0].wait() 
 +            except Exception:​ 
 +                pass
  
  
-    def run_command(self,​ cmdline, context): +    def run_command(self,​ cmdline, context, cmd_input=None): 
-        """​Executes the specified command-line."""​+        """​Executes the specified command-line. 
 + 
 +        By default, stdin of the process is not monitored and will remain 
 +        attached to the controlling terminal. Passing a string for 
 +        cmd_input will cause stdin to be attached to a pipe instead 
 +        and the specified input to be sent to the process. Passing an 
 +        empty string allows attaching stdin without sending any initial 
 +        input. In either case, passing further input can be done later 
 +        with send_input(),​ but if cmd_input was originally passed as None 
 +        this will fail with an exception. 
 +        ​"""​
  
-        if context in self.results:+        if context in self.__procs:
             raise ValueError("​duplicate context value supplied"​)             raise ValueError("​duplicate context value supplied"​)
 +        popen_args = {}
 +        if cmd_input is not None:
 +            popen_args["​stdin"​] = subprocess.PIPE
         proc = subprocess.Popen(cmdline,​ stdout=subprocess.PIPE,​         proc = subprocess.Popen(cmdline,​ stdout=subprocess.PIPE,​
-                                stderr=subprocess.PIPE,​ close_fds=True) +                                stderr=subprocess.PIPE,​ close_fds=True
-        proc.__context = context +                                **popen_args
-        self.results[context] = None +        ​try: 
-        self.output[context] = ["",​ ""​]+            ​proc.__context = context
  
-        ​for fd in (i.fileno() for i in (proc.stdout,​ proc.stderr)):​ +            self.results[context] = None 
-            self.fd_map[fd] = proc +            self.__procs[context] = [proc, None if cmd_input is None else ""​] 
-            self.poller.register(fd,​ select.POLLIN | select.POLLHUP)+            self.handle_create(self,​ context) 
 + 
 +            ​for fd in (i.fileno() for i in (proc.stdout,​ proc.stderr)):​ 
 +                self.__fd_map[fd] = proc 
 +                self.__poller.register(fd,​ select.POLLIN | select.POLLHUP
 + 
 +            if cmd_input is not None: 
 +                self.__poller.register(proc.stdin.fileno(),​ 0) 
 +                self.__fd_map[proc.stdin.fileno()] = proc 
 +                self.send_input(context,​ cmd_input) 
 +        except: 
 +            # In case of problems, make sure we terminate the subprocess, but 
 +            # make sure the exception we re-throw isn't replaced in case the 
 +            # cleanup code throws its own exception. 
 +            try: 
 +                raise 
 +            finally: 
 +                try: 
 +                    proc.kill() 
 +                    proc.wait() 
 +                except: 
 +                    pass 
 + 
 + 
 +    def send_input(self,​ context, cmd_input):​ 
 +        """​Send further output to an active process."""​ 
 + 
 +        if context not in self.__procs:​ 
 +            raise KeyError("​context not found"​) 
 +        if self.__procs[context][1] is None: 
 +            raise ValueError("​specified process not open for input"​) 
 +        old_input = bool(self.__procs[context][1]) 
 +        self.__procs[context][1] += cmd_input 
 +        if not old_input and cmd_input:​ 
 +            self.__poller.modify(self.__procs[context][0].stdin.fileno(),​ 
 +                                 ​select.POLLOUT) 
 + 
 + 
 +    def send_signal(self,​ context, sig=signal.SIGTERM):​ 
 +        """​Send signal to specified process."""​ 
 + 
 +        if context not in self.__procs:​ 
 +            raise KeyError("​context not found"​) 
 +        self.__procs[context][0].send_signal(sig)
  
  
Line 50: Line 117:
         timeout = time.time() + timeout if timeout is not None else None         timeout = time.time() + timeout if timeout is not None else None
  
-        while self.fd_map+        while self.__fd_map or self.__closed_procs: 
-            ​dead_procs ​set()+ 
 +            ​# While there are processes who have closed file descriptors but 
 +            # not yet terminated, check their status at least every 500ms. 
 +            if self.__closed_procs:​ 
 +                poll_timeout ​min(poll_timeout,​ 500) 
 + 
 +            for fd, events in self.__poller.poll(poll_timeout):​ 
 +                proc = self.__fd_map[fd] 
 +                if events & select.POLLOUT:​ 
 +                    proc_list = self.__procs[proc.__context] 
 +                    if proc_list[1]:​ 
 +                        sent = os.write(fd,​ proc_list[1]) 
 +                        proc_list[1] = proc_list[1][sent:​] 
 +                    if not proc_list[1]:​ 
 +                        self.__poller.modify(fd, 0)
  
-            for fd, events in self.poller.poll(poll_timeout):​ 
-                proc = self.fd_map[fd] 
                 if events & select.POLLIN:​                 if events & select.POLLIN:​
-                    if (self.handle_output(proc.__context,​ +                    ​data = os.read(fd, 4096) 
-                                           fd == proc.stderr.fileno(),​ +                    ​if data: 
-                                           ​os.read(fd,​ 4096))): +                        ​self.handle_output(self, proc.__context,​ 
-                        proc.send_signal(signal.SIGTERM)+                                           fd == proc.stderr.fileno(), ​data)
                 if events & select.POLLHUP:​                 if events & select.POLLHUP:​
-                    self.handle_terminate(proc.__context+                    self.__poller.unregister(fd) 
-                    ​dead_procs.add(proc)+                    del self.__fd_map[fd] 
 +                    if not proc.stdout.closed and proc.stdout.fileno() == fd: 
 +                        proc.stdout.close(
 +                    ​elif not proc.stderr.closed and proc.stderr.fileno() == fd: 
 +                        proc.stderr.close() 
 +                    if proc.stdout.closed and proc.stderr.closed:​ 
 +                        if self.__procs[proc.__context][1] is not None: 
 +                            self.__poller.unregister(proc.stdin.fileno()) 
 +                            del self.__fd_map[proc.stdin.fileno()] 
 +                            proc.stdin.close() 
 +                        self.__closed_procs.add(proc)
  
-            for proc in dead_procs+            ​dead_procs = set() 
-                self.results[proc.__context] = proc.wait() +            try: 
-                out, err = proc.stdout.fileno(), proc.stderr.fileno(+                ​for proc in self.__closed_procs
-                for i in out, err+                    ret = proc.poll() 
-                    self.poller.unregister(i) +                    if ret is not None: 
-                    del self.fd_map[i]+                        ​self.results[proc.__context] = ret 
 +                        dead_procs.add(proc
 +                        self.handle_terminate(self, proc.__context
 +            ​finally
 +                self.__closed_procs -= dead_procs
  
             if timeout is not None:             if timeout is not None:
Line 78: Line 171:
  
  
-    def handle_output(self,​ context, ​stderr, data): +    def handle_output(self, poller, context, ​is_stderr, data): 
-        """​Derived classes can override to intercept output.+        """​Derived classes can override to intercept output."""​ 
 +        pass
  
-        Returning True from this function will send SIGTERM to the process. 
-        """​ 
  
-        index = 1 if stderr else 0 +    ​def handle_terminate(self, poller, context):
-        self.output[context][index] += data +
-        return False +
- +
- +
-    ​def handle_terminate(self,​ context):+
         """​Derived classes can override to detect terminate."""​         """​Derived classes can override to detect terminate."""​
 +        pass
  
 +
 +    def handle_create(self,​ poller, context):
 +        """​Derived classes can override to hook in to creation."""​
         pass         pass
 +
 +
 +
 +class BufferingProcPoller(ProcPoller):​
 +    """​A sample ProcPoller implementation which simply buffers output."""​
 +
 +    def __init__(self):​
 +        """​Initialise output buffer dict."""​
 +        ​
 +        super(BufferingProcPoller,​ self).__init__()
 +        self.output = {}
 +
 +
 +    def handle_create(self,​ poller, context):
 +        """​Add empty output buffer entries."""​
 +
 +        self.output[context] = ["",​ ""​] ​       ​
 +
 +
 +    def handle_output(self,​ poller, context, is_stderr, data):
 +        """​Buffer up output."""​
 +
 +        index = 1 if is_stderr else 0
 +        self.output[context][index] += data
 +        return False
 +
 </​code>​ </​code>​
  
notes/python_process_poller.txt · Last modified: 2013/07/22 12:32 by andy