mailr15395 - in /1.3/multi: multi_processor_base.py processor.py processor_io.py


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by edward on February 28, 2012 - 18:51:
Author: bugman
Date: Tue Feb 28 18:51:09 2012
New Revision: 15395

URL: http://svn.gna.org/viewcvs/relax?rev=15395&view=rev
Log:
Big fix for the IO streams when running in the multi-processor mpi4py mode!

This affects all multi-processor implementations, excluding the standard 
uni-processor.  The entire
stream capture design has been overhauled.  The IO streams of the master 
processor are no longer
touched, whereas those of the slaves are captured, stored in a list of lists 
where the original
'S  |' and 'S E|' tokens are prepended to each line as well as a number 
identifying the stream,
stored in the results command object, and unpacked and sent to sys.stdout and 
sys.stderr in the
correct order by the master.

This fixes the implementation for both running on the GUI (the slaves are no 
longer dumping text to
the terminal where the GUI was launched from), and on clusters where the IO 
is dumped on the
processing node rather than back on the master node.


Modified:
    1.3/multi/multi_processor_base.py
    1.3/multi/processor.py
    1.3/multi/processor_io.py

Modified: 1.3/multi/multi_processor_base.py
URL: 
http://svn.gna.org/viewcvs/relax/1.3/multi/multi_processor_base.py?rev=15395&r1=15394&r2=15395&view=diff
==============================================================================
--- 1.3/multi/multi_processor_base.py (original)
+++ 1.3/multi/multi_processor_base.py Tue Feb 28 18:51:09 2012
@@ -1,7 +1,7 @@
 
###############################################################################
 #                                                                            
 #
 # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)   
 #
-# Copyright (C) 2011 Edward d'Auvergne                                       
 #
+# Copyright (C) 2011-2012 Edward d'Auvergne                                  
 #
 #                                                                            
 #
 # This file is part of the program relax.                                    
 #
 #                                                                            
 #
@@ -46,13 +46,36 @@
 
 
 class Batched_result_command(Result_command):
-    def __init__(self, processor, result_commands, completed=True):
+    def __init__(self, processor, result_commands, io_data=None, 
completed=True):
         super(Batched_result_command, self).__init__(processor=processor, 
completed=completed)
         self.result_commands = result_commands
 
+        # Store the IO data to print out via the run() method called by the 
master.
+        self.io_data = io_data
+
 
     def run(self, processor, batched_memo):
+        """The results command to be run by the master.
+
+        @param processor:       The processor instance.
+        @type processor:        Processor instance
+        @param batched_memo:    The batched memo object.
+        @type batched_memo:     Memo instance
+        """
+
+        # First check that we are on the master.
         processor.assert_on_master()
+
+        # Unravel the IO stream data on the master in the correct order.
+        for line, stream in self.io_data:
+            if stream == 0:
+                sys.stdout.write(line)
+            else:
+                sys.stderr.write(line)
+
+        if batched_memo != None:
+            msg = "batched result commands shouldn't have memo values, memo: 
" + repr(batched_memo)
+
         if batched_memo != None:
             msg = "batched result commands shouldn't have memo values, memo: 
" + repr(batched_memo)
             raise ValueError(msg)
@@ -154,9 +177,6 @@
 
         # Execute the base class method.
         super(Multi_processor, self).pre_run()
-
-        # Capture the standard IO streams for the master and slaves.
-        self.capture_stdio()
 
 
     #FIXME: fill out generic result processing move to processor
@@ -230,13 +250,18 @@
                         self.result_list = None
 
                     for i, command in enumerate(commands):
-
-                        #raise Exception('dummy')
+                        # Capture the standard IO streams for the slaves.
+                        self.stdio_capture()
+
+                        # Execute the calculation.
                         completed = (i == last_command)
                         command.run(self, completed)
 
+                        # Restore the IO.
+                        self.stdio_restore()
+
                     if self.batched_returns:
-                        
self.return_object(Batched_result_command(processor=self, 
result_commands=self.result_list))
+                        
self.return_object(Batched_result_command(processor=self, 
result_commands=self.result_list, io_data=self.io_data))
                         self.result_list = None
 
                 except:

Modified: 1.3/multi/processor.py
URL: 
http://svn.gna.org/viewcvs/relax/1.3/multi/processor.py?rev=15395&r1=15394&r2=15395&view=diff
==============================================================================
--- 1.3/multi/processor.py (original)
+++ 1.3/multi/processor.py Tue Feb 28 18:51:09 2012
@@ -1,7 +1,7 @@
 
###############################################################################
 #                                                                            
 #
 # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)   
 #
-# Copyright (C) 2011 Edward d'Auvergne                                       
 #
+# Copyright (C) 2011-2012 Edward d'Auvergne                                  
 #
 #                                                                            
 #
 # This file is part of the program relax.                                    
 #
 #                                                                            
 #
@@ -103,7 +103,7 @@
 import traceback, textwrap
 
 # relax module imports.
-from multi.processor_io import PrependStringIO, IO_filter
+from multi.processor_io import IO_filter, Redirect_text
 from relax_errors import RelaxError
 
 
@@ -419,9 +419,6 @@
         self._processor_size = processor_size
         '''Number of slave processors available in this processor.'''
 
-        # Capture the STDIO.
-        self.std_stdio_capture()
-
 
     def abort(self):
         '''Shutdown the multi processor in exceptional conditions - designed 
for overriding.
@@ -462,32 +459,6 @@
         raise_unimplemented(self.add_to_queue)
 
 
-    def capture_stdio(self, stdio_capture=None):
-        '''Enable capture of the STDOUT and STDERR by self.stdio_capture or 
user supplied streams.
-
-        @note:  Both or neither stream has to be replaced you can't just 
replace one!
-
-        @keyword stdio_capture: A pair of file like objects used to replace 
sys.stdout and sys.stderr respectively.
-        @type stdio_capture:    list of two file-like objects
-        '''
-
-        # Store the original STDOUT and STDERR for restoring later on.
-        self.orig_stdout = sys.stdout
-        self.orig_stderr = sys.stderr
-
-        # Default to self.stdio_capture if stdio_capture is not supplied.
-        if stdio_capture == None:
-            stdio_capture = self.stdio_capture
-
-        # First flush.
-        sys.stdout.flush()
-        sys.stderr.flush()
-
-        # Then redirect IO.
-        sys.stdout = stdio_capture[0]
-        sys.stderr = stdio_capture[1]
-
-
     # FIXME is this used?
 #    def exit(self):
 #        raise_unimplemented(self.exit)
@@ -523,16 +494,6 @@
         '''
 
         raise_unimplemented(self.get_name)
-
-
-    def get_stdio_capture(self):
-        '''Get the file like objects currently replacing sys.stdout and 
sys.stderr.
-
-        @return:    The file like objects currently replacing sys.stdout and 
sys.stderr.
-        @rtype:     tuple of two file-like objects
-        '''
-
-        return self.stdio_capture
 
 
     def get_stdio_pre_strings(self):
@@ -713,36 +674,34 @@
         raise_unimplemented(self.run_queue)
 
 
-    def std_stdio_capture(self, pre_strings=None):
-        '''Get the default sys.stdout and sys.stderr replacements.
-
-        On the master the replacement prepend output with 'MM S]' or MM E]' 
for the STDOUT and STDERR channels respectively on slaves the outputs are 
replaced by StringIO objects that prepend 'NN S]' or NN E]' for STDOUT and 
STDERR where NN is the rank of the processor.
-
-        @note:  By default STDOUT and STDERR are conjoined as otherwise the 
context of STDOUT and STDERR messages are lost.
-        @todo:  Improve segregation of sys.sdout and sys.stderr.
-
-        @keyword pre_strings:   Pre strings for the sys.stdout and 
sys.stderr channels.
-        @type pre_strings:      list of 2 str
-        @return:                File like objects to replace STDOUT and 
STDERR respectively in order.
-        @rtype:                 tuple of two file-like objects
-        '''
+    def stdio_capture(self):
+        """Enable capture of the STDOUT and STDERR.
+        
+        This is currently used to capture the IO streams of the slaves to 
return back to the master.
+        """
+
+        # Store the original STDOUT and STDERR for restoring later on.
+        self.orig_stdout = sys.stdout
+        self.orig_stderr = sys.stderr
+
+        # The data object.
+        self.io_data = []
 
         # Get the strings to prepend to the IO streams.
-        if pre_strings == None:
-            pre_strings = self.get_stdio_pre_strings()
-
-        # The master processor.
-        if self.rank() == 0:
-            stdout_capture = IO_filter(pre_strings[0], sys.stdout)
-            stderr_capture = IO_filter(pre_strings[1], sys.stderr)
-
-        # The slaves.
-        else:
-            stdout_capture = PrependStringIO(pre_strings[0])
-            stderr_capture = PrependStringIO(pre_strings[1], 
stream=stdout_capture)
-
-        # Store the captured IO streams.
-        self.stdio_capture = (stdout_capture, stderr_capture)
+        pre_strings = self.get_stdio_pre_strings()
+
+        # Then redirect IO.
+        sys.stdout = Redirect_text(self.io_data, token=pre_strings[0], 
stream=0)
+        sys.stderr = Redirect_text(self.io_data, token=pre_strings[1], 
stream=1)
+
+
+    def stdio_restore(self):
+        """Restore the original STDOUT and STDERR streams."""
+
+        # Restore the original streams.
+        sys.stdout = self.orig_stdout
+        sys.stderr = self.orig_stderr
+
 
 
 class Processor_box(object):
@@ -901,7 +860,6 @@
     def __init__(self, processor, string, completed):
         '''Initialiser.
 
-        @see:   multi.processor.Processor.std_stdio_capture.
         @todo:  Check inherited parameters are documented.
 
         @param string:  A string to return the master processor for output 
to STDOUT (note the

Modified: 1.3/multi/processor_io.py
URL: 
http://svn.gna.org/viewcvs/relax/1.3/multi/processor_io.py?rev=15395&r1=15394&r2=15395&view=diff
==============================================================================
--- 1.3/multi/processor_io.py (original)
+++ 1.3/multi/processor_io.py Tue Feb 28 18:51:09 2012
@@ -1,6 +1,7 @@
 
###############################################################################
 #                                                                            
 #
 # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)   
 #
+# Copyright (C) 2012 Edward d'Auvergne                                       
 #
 #                                                                            
 #
 # This file is part of the program relax.                                    
 #
 #                                                                            
 #
@@ -121,3 +122,44 @@
         # Flush both STDOUT and STDERR.
         sys.stdout.flush()
         sys.stderr.flush()
+
+
+
+class Redirect_text(object):
+    """Store the data of the IO streams, prepending a token to each line of 
written text."""
+
+    def __init__(self, data, token='', stream=0):
+        """Set up the text redirection object.
+
+        @param data:        The data object to store all IO in.
+        @type data:         list of lists
+        @param token:       The string to add to the end of all newlines.
+        @type token:        str
+        @keyword stream:    The type of steam (0 for STDOUT and 1 for 
STDERR).
+        @type stream:       int
+        """
+
+        # Store the args.
+        self.data = data
+        self.token = token
+        self.stream = stream
+
+
+    def flush(self):
+        """Dummy flush method."""
+
+
+    def write(self, string):
+        """Replacement write() method.
+        
+        This prepends the token to each line of STDOUT and STDERR and stores 
the result together with the stream number.
+
+        @param string:  The text to write.
+        @type string:   str
+        """
+
+        # Append the token to all newline chars.
+        string = string.replace('\n', '\n' + self.token)
+
+        # Store the text.
+        self.data.append([string, self.stream])




Related Messages


Powered by MHonArc, Updated Wed Feb 29 11:00:02 2012