Author: bugman Date: Fri Mar 23 12:37:27 2012 New Revision: 15613 URL: http://svn.gna.org/viewcvs/relax?rev=15613&view=rev Log: Shifted the run_command_queue() and run_queue() methods from the Multi_processor to Processor class. Modified: 1.3/multi/multi_processor_base.py 1.3/multi/processor.py Modified: 1.3/multi/multi_processor_base.py URL: http://svn.gna.org/viewcvs/relax/1.3/multi/multi_processor_base.py?rev=15613&r1=15612&r2=15613&view=diff ============================================================================== --- 1.3/multi/multi_processor_base.py (original) +++ 1.3/multi/multi_processor_base.py Fri Mar 23 12:37:27 2012 @@ -183,69 +183,6 @@ def return_result_command(self, result_object): raise_unimplemented(self.slave_queue_result) - - - #TODO: move up a level add virtaul send and revieve functions - def run_command_queue(self, queue): - """Process all commands on the queue and wait for completion. - - @param queue: The command queue. - @type queue: list of Command instances - """ - - # This must only be run on the master processor. - self.assert_on_master() - - running_set = set() - idle_set = set([i for i in range(1, self.processor_size()+1)]) - - if self.threaded_result_processing: - result_queue = Threaded_result_queue(self) - else: - result_queue = Immediate_result_queue(self) - - while len(queue) != 0: - - while len(idle_set) != 0: - if len(queue) != 0: - command = queue.pop() - dest = idle_set.pop() - self.master_queue_command(command=command, dest=dest) - running_set.add(dest) - else: - break - - # Loop until the queue of calculations is depleted. - while len(running_set) != 0: - # Get the result. - result = self.master_receive_result() - - # Debugging print out. - if verbosity.level(): - print('\nIdle set: %s' % idle_set) - print('Running set: %s' % running_set) - - # Shift the processor rank to the idle set. - if result.completed: - idle_set.add(result.rank) - running_set.remove(result.rank) - - # Add to the result queue for instant or threaded processing. - result_queue.put(result) - - # Process the threaded results. - if self.threaded_result_processing: - result_queue.run_all() - - - #TODO: move up a level - def run_queue(self): - #FIXME: need a finally here to cleanup exceptions states - lqueue = self.chunk_queue(self.command_queue) - self.run_command_queue(lqueue) - - del self.command_queue[:] - self.memo_map.clear() def slave_receive_commands(self): Modified: 1.3/multi/processor.py URL: http://svn.gna.org/viewcvs/relax/1.3/multi/processor.py?rev=15613&r1=15612&r2=15613&view=diff ============================================================================== --- 1.3/multi/processor.py (original) +++ 1.3/multi/processor.py Fri Mar 23 12:37:27 2012 @@ -103,6 +103,7 @@ # multi module imports. from multi.misc import Capturing_exception, raise_unimplemented, Verbosity; verbosity = Verbosity() +from multi.multi_processor_base import Threaded_result_queue from multi.processor_io import Redirect_text from multi.result_commands import Batched_result_command, Null_result_command, Result_exception from multi.slave_commands import Slave_storage_command @@ -536,6 +537,58 @@ self.run_command_queue(queue) + def run_command_queue(self, queue): + """Process all commands on the queue and wait for completion. + + @param queue: The command queue. + @type queue: list of Command instances + """ + + # This must only be run on the master processor. + self.assert_on_master() + + running_set = set() + idle_set = set([i for i in range(1, self.processor_size()+1)]) + + if self.threaded_result_processing: + result_queue = Threaded_result_queue(self) + else: + result_queue = Immediate_result_queue(self) + + while len(queue) != 0: + + while len(idle_set) != 0: + if len(queue) != 0: + command = queue.pop() + dest = idle_set.pop() + self.master_queue_command(command=command, dest=dest) + running_set.add(dest) + else: + break + + # Loop until the queue of calculations is depleted. + while len(running_set) != 0: + # Get the result. + result = self.master_receive_result() + + # Debugging print out. + if verbosity.level(): + print('\nIdle set: %s' % idle_set) + print('Running set: %s' % running_set) + + # Shift the processor rank to the idle set. + if result.completed: + idle_set.add(result.rank) + running_set.remove(result.rank) + + # Add to the result queue for instant or threaded processing. + result_queue.put(result) + + # Process the threaded results. + if self.threaded_result_processing: + result_queue.run_all() + + def run_queue(self): """Run the processor queue - an abstract method. @@ -543,7 +596,12 @@ thread to block until the command has completed. """ - raise_unimplemented(self.run_queue) + #FIXME: need a finally here to cleanup exceptions states + lqueue = self.chunk_queue(self.command_queue) + self.run_command_queue(lqueue) + + del self.command_queue[:] + self.memo_map.clear() def stdio_capture(self):