Author: bugman Date: Wed Mar 21 14:44:43 2012 New Revision: 15594 URL: http://svn.gna.org/viewcvs/relax?rev=15594&view=rev Log: Shifted the Multi_processor.run() method up a level to Processor.run(). This completes one of the TODOs, and will be needed to avoid code duplication for handling the new data_upload() and data_fetch() API methods. This run() method now works for the uni-processor, as the Uni_processor.on_master() method which always returns True has been reactived, so the Uni_processor.run() method has been deleted. Modified: 1.3/multi/multi_processor_base.py 1.3/multi/processor.py 1.3/multi/uni_processor.py Modified: 1.3/multi/multi_processor_base.py URL: http://svn.gna.org/viewcvs/relax/1.3/multi/multi_processor_base.py?rev=15594&r1=15593&r2=15594&view=diff ============================================================================== --- 1.3/multi/multi_processor_base.py (original) +++ 1.3/multi/multi_processor_base.py Wed Mar 21 14:44:43 2012 @@ -42,8 +42,8 @@ import traceback # relax module imports. -from multi.api import Batched_result_command, Result, Result_command, Result_string, Result_exception -from multi.misc import Capturing_exception, raise_unimplemented, Verbosity; verbosity = Verbosity() +from multi.api import Batched_result_command, Result, Result_command, Result_exception, Result_string +from multi.misc import raise_unimplemented, Verbosity; verbosity = Verbosity() from multi.processor import Processor @@ -183,57 +183,6 @@ def return_result_command(self, result_object): raise_unimplemented(self.slave_queue_result) - - - #TODO: move up a level and add virtual send and recieve - def run(self): - self.pre_run() - if self.on_master(): - try: - self.callback.init_master(self) - - except Exception, e: - self.callback.handle_exception(self, e) - - else: - while not self.do_quit: - try: - commands = self.slave_recieve_commands() - if not isinstance(commands, list): - commands = [commands] - last_command = len(commands)-1 - - if self.batched_returns: - self.result_list = [] - else: - self.result_list = None - - for i, command in enumerate(commands): - # Capture the standard IO streams for the slaves. - self.stdio_capture() - - # Execute the calculation. - completed = (i == last_command) - command.run(self, completed) - - # Restore the IO. - self.stdio_restore() - - if self.batched_returns: - self.return_object(Batched_result_command(processor=self, result_commands=self.result_list, io_data=self.io_data)) - self.result_list = None - - except: - capturing_exception = Capturing_exception(rank=self.rank(), name=self.get_name()) - exception_result = Result_exception(exception=capturing_exception, processor=self, completed=True) - - self.return_object(exception_result) - self.result_list = None - - self.post_run() - if self.on_master(): - # note this a modified exit that kills all MPI processors - sys.exit() #TODO: move up a level add virtaul send and revieve functions Modified: 1.3/multi/processor.py URL: http://svn.gna.org/viewcvs/relax/1.3/multi/processor.py?rev=15594&r1=15593&r2=15594&view=diff ============================================================================== --- 1.3/multi/processor.py (original) +++ 1.3/multi/processor.py Wed Mar 21 14:44:43 2012 @@ -102,8 +102,8 @@ import time, datetime, math, sys # relax module imports. -from multi.api import Null_result_command -from multi.misc import raise_unimplemented, Verbosity; verbosity = Verbosity() +from multi.api import Batched_result_command, Null_result_command, Result_exception +from multi.misc import Capturing_exception, raise_unimplemented, Verbosity; verbosity = Verbosity() from multi.processor_io import Redirect_text @@ -422,14 +422,58 @@ def run(self): """Run the processor - an abstract method. - This function runs the processor main loop and is called after all processor setup has been - completed. It does remote execution setup and teardown round either side of a call to - Application_callback.init_master. + This function runs the processor main loop and is called after all processor setup has been completed. It does remote execution setup and teardown round either side of a call to Application_callback.init_master. @see: multi.processor.Application_callback. """ - raise_unimplemented(self.run) + self.pre_run() + if self.on_master(): + try: + self.callback.init_master(self) + + except Exception, e: + self.callback.handle_exception(self, e) + + else: + while not self.do_quit: + try: + commands = self.slave_recieve_commands() + if not isinstance(commands, list): + commands = [commands] + last_command = len(commands)-1 + + if self.batched_returns: + self.result_list = [] + else: + self.result_list = None + + for i, command in enumerate(commands): + # Capture the standard IO streams for the slaves. + self.stdio_capture() + + # Execute the calculation. + completed = (i == last_command) + command.run(self, completed) + + # Restore the IO. + self.stdio_restore() + + if self.batched_returns: + self.return_object(Batched_result_command(processor=self, result_commands=self.result_list, io_data=self.io_data)) + self.result_list = None + + except: + capturing_exception = Capturing_exception(rank=self.rank(), name=self.get_name()) + exception_result = Result_exception(exception=capturing_exception, processor=self, completed=True) + + self.return_object(exception_result) + self.result_list = None + + self.post_run() + if self.on_master(): + # note this a modified exit that kills all MPI processors + sys.exit() def run_command_globally(self, command): Modified: 1.3/multi/uni_processor.py URL: http://svn.gna.org/viewcvs/relax/1.3/multi/uni_processor.py?rev=15594&r1=15593&r2=15594&view=diff ============================================================================== --- 1.3/multi/uni_processor.py (original) +++ 1.3/multi/uni_processor.py Wed Mar 21 14:44:43 2012 @@ -81,8 +81,15 @@ return '%s-%s' % (os.getenv('HOSTNAME'), os.getpid()) -# def on_master(self): -# return True + def on_master(self): + """For the uni-processor fabric, we are always on the master. + + @return: The flag specifying if we are on the master or slave processors. + @rtype: bool + """ + + # Always master. + return True def post_run(self): @@ -130,15 +137,6 @@ raise Exception(message) - def run(self): - try: - self.pre_run() - self.callback.init_master(self) - self.post_run() - except Exception, e: - self.callback.handle_exception(self, e) - - def run_queue(self): #FIXME: need a finally here to cleanup exceptions states for windows etc