User Tools

Site Tools


notes:python_process_poller

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
Last revision Both sides next revision
notes:python_process_poller [2013/07/22 11:06]
andy
notes:python_process_poller [2013/07/22 12:30]
andy
Line 3: Line 3:
 This is a convenience class which allows multiple subprocess to be forked, and collects output from them as they run. The ''​run_command()''​ method should be passed a command-line in ''​list''​ form as well as unique context value which will be used to identify the command later (this can be any hashable value). This is a convenience class which allows multiple subprocess to be forked, and collects output from them as they run. The ''​run_command()''​ method should be passed a command-line in ''​list''​ form as well as unique context value which will be used to identify the command later (this can be any hashable value).
  
-The base class implementation simply buffers all output from each process in a 2-tuple of ''​(stdout,​ stderr)''​ in the ''​output''​ dictionary of the class, whose keys are the context provided to ''​run_command()''​. ​The ''​handle_output()''​ method can be overridden by derived ​classes to modify this behaviour, but bear in mind that ''​output''​ won't be populated unless the base class version is called.+The A sample ''​BufferedProcPoller'' ​implementation ​is also provided which simply buffers all output from each process in a 2-tuple of ''​(stdout,​ stderr)''​ in the ''​output''​ dictionary of the class, whose keys are the context provided to ''​run_command()''​. ​It's expected that most applications will need to provide their own derived ​versions for more robust ​behaviour.
  
 The return codes of the processes are stored in the ''​results''​ attribute, and are ''​None''​ until the processes terminate. At this point, the ''​handle_terminate()''​ method is called, which can be overridden by derived classes to implement specific behaviour if required. The return codes of the processes are stored in the ''​results''​ attribute, and are ''​None''​ until the processes terminate. At this point, the ''​handle_terminate()''​ method is called, which can be overridden by derived classes to implement specific behaviour if required.
Line 18: Line 18:
 import subprocess import subprocess
 import time import time
- 
  
 class ProcPoller(object):​ class ProcPoller(object):​
Line 24: Line 23:
  
     def __init__(self):​     def __init__(self):​
-        self.fd_map = {} 
         self.results = {}         self.results = {}
-        self.output ​= {} +        self.__fd_map ​= {} 
-        self.closed_procs ​= set() +        self.__procs = {} 
-        self.poller ​= select.poll()+        self.__closed_procs ​= set() 
 +        self.__poller ​= select.poll()
  
  
-    def run_command(self,​ cmdline, context): +    def run_command(self,​ cmdline, context, cmd_input=None): 
-        """​Executes the specified command-line."""​+        """​Executes the specified command-line.
  
-        if context in self.results:+        ​By default, stdin of the process is not monitored and will remain 
 +        attached to the controlling terminal. Passing a string for 
 +        cmd_input will cause stdin to be attached to a pipe instead 
 +        and the specified input to be sent to the process. Passing an 
 +        empty string allows attaching stdin without sending any initial 
 +        input. In either case, passing further input can be done later 
 +        with send_input(),​ but if cmd_input was originally passed as None 
 +        this will fail with an exception. 
 +        """​ 
 + 
 +        ​if context in self.__procs:
             raise ValueError("​duplicate context value supplied"​)             raise ValueError("​duplicate context value supplied"​)
 +        popen_args = {}
 +        if cmd_input is not None:
 +            popen_args["​stdin"​] = subprocess.PIPE
         proc = subprocess.Popen(cmdline,​ stdout=subprocess.PIPE,​         proc = subprocess.Popen(cmdline,​ stdout=subprocess.PIPE,​
-                                stderr=subprocess.PIPE,​ close_fds=True) +                                stderr=subprocess.PIPE,​ close_fds=True
-        proc.__context = context +                                **popen_args
-        self.results[context] = None +        ​try: 
-        self.output[context] = ["",​ ""​]+            ​proc.__context = context
  
-        ​for fd in (i.fileno() for i in (proc.stdout,​ proc.stderr)):​ +            self.results[context] = None 
-            self.fd_map[fd] = proc +            self.__procs[context] = [proc, None if cmd_input is None else ""​] 
-            self.poller.register(fd,​ select.POLLIN | select.POLLHUP)+            self.handle_create(self,​ context) 
 + 
 +            ​for fd in (i.fileno() for i in (proc.stdout,​ proc.stderr)):​ 
 +                self.__fd_map[fd] = proc 
 +                self.__poller.register(fd,​ select.POLLIN | select.POLLHUP
 + 
 +            if cmd_input is not None: 
 +                self.__poller.register(proc.stdin.fileno(),​ 0) 
 +                self.__fd_map[proc.stdin.fileno()] = proc 
 +                self.send_input(context,​ cmd_input) 
 +        except: 
 +            # In case of problems, make sure we terminate the subprocess, but 
 +            # make sure the exception we re-throw isn't replaced in case the 
 +            # cleanup code throws its own exception. 
 +            try: 
 +                raise 
 +            finally: 
 +                try: 
 +                    proc.kill() 
 +                    proc.wait() 
 +                except: 
 +                    pass 
 + 
 + 
 +    def send_input(self,​ context, cmd_input):​ 
 +        """​Send further output to an active process."""​ 
 + 
 +        if context not in self.__procs:​ 
 +            raise KeyError("​context not found"​) 
 +        if self.__procs[context][1] is None: 
 +            raise ValueError("​specified process not open for input"​) 
 +        old_input = bool(self.__procs[context][1]) 
 +        self.__procs[context][1] += cmd_input 
 +        if not old_input and cmd_input:​ 
 +            self.__poller.modify(self.__procs[context][0].stdin.fileno(),​ 
 +                                 ​select.POLLOUT) 
 + 
 + 
 +    def send_signal(self,​ context, sig=signal.SIGTERM):​ 
 +        """​Send signal to specified process."""​ 
 + 
 +        if context not in self.__procs:​ 
 +            raise KeyError("​context not found"​) 
 +        self.__procs[context][0].send_signal(sig)
  
  
Line 53: Line 108:
         timeout = time.time() + timeout if timeout is not None else None         timeout = time.time() + timeout if timeout is not None else None
  
-        while self.fd_map ​or self.closed_procs:+        while self.__fd_map ​or self.__closed_procs:
  
             # While there are processes who have closed file descriptors but             # While there are processes who have closed file descriptors but
             # not yet terminated, check their status at least every 500ms.             # not yet terminated, check their status at least every 500ms.
-            if self.closed_procs:+            if self.__closed_procs:
                 poll_timeout = min(poll_timeout,​ 500)                 poll_timeout = min(poll_timeout,​ 500)
  
-            for fd, events in self.poller.poll(poll_timeout):​ +            for fd, events in self.__poller.poll(poll_timeout):​ 
-                proc = self.fd_map[fd]+                proc = self.__fd_map[fd] 
 +                if events & select.POLLOUT:​ 
 +                    proc_list = self.__procs[proc.__context] 
 +                    if proc_list[1]:​ 
 +                        sent = os.write(fd,​ proc_list[1]) 
 +                        proc_list[1] = proc_list[1][sent:​] 
 +                    if not proc_list[1]:​ 
 +                        self.__poller.modify(fd,​ 0) 
                 if events & select.POLLIN:​                 if events & select.POLLIN:​
                     data = os.read(fd, 4096)                     data = os.read(fd, 4096)
                     if data:                     if data:
-                        ​if (self.handle_output(proc.__context,​ +                        self.handle_output(self, proc.__context,​ 
-                                               ​fd == proc.stderr.fileno(),​ +                                           ​fd == proc.stderr.fileno(),​ data)
-                                               data)): +
-                            proc.send_signal(signal.SIGTERM)+
                 if events & select.POLLHUP:​                 if events & select.POLLHUP:​
-                    self.poller.unregister(fd) +                    self.__poller.unregister(fd) 
-                    del self.fd_map[fd]+                    del self.__fd_map[fd]
                     if not proc.stdout.closed and proc.stdout.fileno() == fd:                     if not proc.stdout.closed and proc.stdout.fileno() == fd:
                         proc.stdout.close()                         proc.stdout.close()
-                    ​else:+                    ​elif not proc.stderr.closed and proc.stderr.fileno() == fd:
                         proc.stderr.close()                         proc.stderr.close()
                     if proc.stdout.closed and proc.stderr.closed:​                     if proc.stdout.closed and proc.stderr.closed:​
-                        self.closed_procs.add(proc)+                        ​if self.__procs[proc.__context][1] is not None: 
 +                            self.__poller.unregister(proc.stdin.fileno()) 
 +                            del self.__fd_map[proc.stdin.fileno()] 
 +                            proc.stdin.close() 
 +                        self.__closed_procs.add(proc)
  
             dead_procs = set()             dead_procs = set()
             try:             try:
-                for proc in self.closed_procs:+                for proc in self.__closed_procs:
                     ret = proc.poll()                     ret = proc.poll()
                     if ret is not None:                     if ret is not None:
                         self.results[proc.__context] = ret                         self.results[proc.__context] = ret
                         dead_procs.add(proc)                         dead_procs.add(proc)
-                        self.handle_terminate(proc.__context)+                        self.handle_terminate(self, proc.__context)
             finally:             finally:
-                self.closed_procs ​-= dead_procs+                self.__closed_procs ​-= dead_procs
  
             if timeout is not None:             if timeout is not None:
Line 97: Line 162:
  
  
-    def handle_output(self,​ context, ​stderr, data): +    def handle_output(self, poller, context, ​is_stderr, data): 
-        """​Derived classes can override to intercept output.+        """​Derived classes can override to intercept output."""​ 
 +        pass
  
-        Returning True from this function will send SIGTERM to the process. 
-        """​ 
  
-        index = 1 if stderr else 0 +    ​def handle_terminate(self, poller, context):
-        self.output[context][index] += data +
-        return False +
- +
- +
-    ​def handle_terminate(self,​ context):+
         """​Derived classes can override to detect terminate."""​         """​Derived classes can override to detect terminate."""​
 +        pass
  
 +
 +    def handle_create(self,​ poller, context):
 +        """​Derived classes can override to hook in to creation."""​
         pass         pass
 +
 +
 +
 +class BufferingProcPoller(ProcPoller):​
 +    """​A sample ProcPoller implementation which simply buffers output."""​
 +
 +    def __init__(self):​
 +        """​Initialise output buffer dict."""​
 +        ​
 +        super(BufferingProcPoller,​ self).__init__()
 +        self.output = {}
 +
 +
 +    def handle_create(self,​ poller, context):
 +        """​Add empty output buffer entries."""​
 +
 +        self.output[context] = ["",​ ""​] ​       ​
 +
 +
 +    def handle_output(self,​ poller, context, is_stderr, data):
 +        """​Buffer up output."""​
 +
 +        index = 1 if is_stderr else 0
 +        self.output[context][index] += data
 +        return False
  
 </​code>​ </​code>​
  
notes/python_process_poller.txt · Last modified: 2013/07/22 12:32 by andy