mailr15613 - in /1.3/multi: multi_processor_base.py processor.py


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by edward on March 23, 2012 - 12:37:
Author: bugman
Date: Fri Mar 23 12:37:27 2012
New Revision: 15613

URL: http://svn.gna.org/viewcvs/relax?rev=15613&view=rev
Log:
Shifted the run_command_queue() and run_queue() methods from the 
Multi_processor to Processor class.


Modified:
    1.3/multi/multi_processor_base.py
    1.3/multi/processor.py

Modified: 1.3/multi/multi_processor_base.py
URL: 
http://svn.gna.org/viewcvs/relax/1.3/multi/multi_processor_base.py?rev=15613&r1=15612&r2=15613&view=diff
==============================================================================
--- 1.3/multi/multi_processor_base.py (original)
+++ 1.3/multi/multi_processor_base.py Fri Mar 23 12:37:27 2012
@@ -183,69 +183,6 @@
 
     def return_result_command(self, result_object):
         raise_unimplemented(self.slave_queue_result)
-
-
-    #TODO: move up a level add virtaul send and revieve functions
-    def run_command_queue(self, queue):
-        """Process all commands on the queue and wait for completion.
-
-        @param queue:   The command queue.
-        @type queue:    list of Command instances
-        """
-
-        # This must only be run on the master processor.
-        self.assert_on_master()
-
-        running_set = set()
-        idle_set = set([i for i in range(1, self.processor_size()+1)])
-
-        if self.threaded_result_processing:
-            result_queue = Threaded_result_queue(self)
-        else:
-            result_queue = Immediate_result_queue(self)
-
-        while len(queue) != 0:
-
-            while len(idle_set) != 0:
-                if len(queue) != 0:
-                    command = queue.pop()
-                    dest = idle_set.pop()
-                    self.master_queue_command(command=command, dest=dest)
-                    running_set.add(dest)
-                else:
-                    break
-
-            # Loop until the queue of calculations is depleted.
-            while len(running_set) != 0:
-                # Get the result.
-                result = self.master_receive_result()
-
-                # Debugging print out.
-                if verbosity.level():
-                    print('\nIdle set:    %s' % idle_set)
-                    print('Running set: %s' % running_set)
-
-                # Shift the processor rank to the idle set.
-                if result.completed:
-                    idle_set.add(result.rank)
-                    running_set.remove(result.rank)
-
-                # Add to the result queue for instant or threaded processing.
-                result_queue.put(result)
-
-        # Process the threaded results.
-        if self.threaded_result_processing:
-            result_queue.run_all()
-
-
-    #TODO: move up a level
-    def run_queue(self):
-        #FIXME: need a finally here to cleanup exceptions states
-         lqueue = self.chunk_queue(self.command_queue)
-         self.run_command_queue(lqueue)
-
-         del self.command_queue[:]
-         self.memo_map.clear()
 
 
     def slave_receive_commands(self):

Modified: 1.3/multi/processor.py
URL: 
http://svn.gna.org/viewcvs/relax/1.3/multi/processor.py?rev=15613&r1=15612&r2=15613&view=diff
==============================================================================
--- 1.3/multi/processor.py (original)
+++ 1.3/multi/processor.py Fri Mar 23 12:37:27 2012
@@ -103,6 +103,7 @@
 
 # multi module imports.
 from multi.misc import Capturing_exception, raise_unimplemented, Verbosity; 
verbosity = Verbosity()
+from multi.multi_processor_base import Threaded_result_queue
 from multi.processor_io import Redirect_text
 from multi.result_commands import Batched_result_command, 
Null_result_command, Result_exception
 from multi.slave_commands import Slave_storage_command
@@ -536,6 +537,58 @@
         self.run_command_queue(queue)
 
 
+    def run_command_queue(self, queue):
+        """Process all commands on the queue and wait for completion.
+
+        @param queue:   The command queue.
+        @type queue:    list of Command instances
+        """
+
+        # This must only be run on the master processor.
+        self.assert_on_master()
+
+        running_set = set()
+        idle_set = set([i for i in range(1, self.processor_size()+1)])
+
+        if self.threaded_result_processing:
+            result_queue = Threaded_result_queue(self)
+        else:
+            result_queue = Immediate_result_queue(self)
+
+        while len(queue) != 0:
+
+            while len(idle_set) != 0:
+                if len(queue) != 0:
+                    command = queue.pop()
+                    dest = idle_set.pop()
+                    self.master_queue_command(command=command, dest=dest)
+                    running_set.add(dest)
+                else:
+                    break
+
+            # Loop until the queue of calculations is depleted.
+            while len(running_set) != 0:
+                # Get the result.
+                result = self.master_receive_result()
+
+                # Debugging print out.
+                if verbosity.level():
+                    print('\nIdle set:    %s' % idle_set)
+                    print('Running set: %s' % running_set)
+
+                # Shift the processor rank to the idle set.
+                if result.completed:
+                    idle_set.add(result.rank)
+                    running_set.remove(result.rank)
+
+                # Add to the result queue for instant or threaded processing.
+                result_queue.put(result)
+
+        # Process the threaded results.
+        if self.threaded_result_processing:
+            result_queue.run_all()
+
+
     def run_queue(self):
         """Run the processor queue - an abstract method.
 
@@ -543,7 +596,12 @@
         thread to block until the command has completed.
         """
 
-        raise_unimplemented(self.run_queue)
+        #FIXME: need a finally here to cleanup exceptions states
+        lqueue = self.chunk_queue(self.command_queue)
+        self.run_command_queue(lqueue)
+
+        del self.command_queue[:]
+        self.memo_map.clear()
 
 
     def stdio_capture(self):




Related Messages


Powered by MHonArc, Updated Fri Mar 23 13:00:02 2012