Author: varioustoxins Date: Thu Mar 22 09:20:55 2007 New Revision: 3239 URL: http://svn.gna.org/viewcvs/relax?rev=3239&view=rev Log: multi processor fixes for proper command queing with optional load balancing iexception support and segmented results Modified: branches/multi_processor/multi/mpi4py_processor.py branches/multi_processor/relax Modified: branches/multi_processor/multi/mpi4py_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3239&r1=3238&r2=3239&view=diff ============================================================================== --- branches/multi_processor/multi/mpi4py_processor.py (original) +++ branches/multi_processor/multi/mpi4py_processor.py Thu Mar 22 09:20:55 2007 @@ -1,7 +1,9 @@ #!/usr/bin/env python +#TODO clone communicators & resize import sys import os +import math # load mpi @@ -15,27 +17,80 @@ if MPI.rank == 0: _sys_exit = sys.exit + +def rank_format_string(): + digits = math.ceil(math.log10(MPI.size)) + format = '%%%di' % digits + return format + +RANK_FORMAT_STRING = rank_format_string + # wrapper sys.exit function def exit(status=None): exit_mpi() _sys_exit(status) +def broadcast_command(command): + for i in range(1,MPI.size): + if i != 0: + MPI.COMM_WORLD.Send(buf=command,dest=i) + +def ditch_all_results(): + for i in range(1,MPI.size): + if i != 0: + while 1: + result = MPI.COMM_WORLD.Recv(source=i) + if result.completed: + break def exit_mpi(): if MPI.Is_initialized() and not MPI.Is_finalized() and MPI.rank == 0: - sendbuf = Exit_command() - for i in range(MPI.size): - if i != 0: - MPI.COMM_WORLD.Send(buf=sendbuf,dest=i) + broadcast_command(Exit_command()) + ditch_all_results() + + +class Result(object): + def __init__(self): + self.rank=MPI.rank + +class Result_string(Result): + #FIXME move result up a level + def __init__(self,string,completed): + super(Result_string,self).__init__() + self.string=string + self.completed=completed + +class Result_command(Result): + def __init__(self,completed): + super(Result_command,self).__init__() + self.completed=completed + + def run(self,relax,processor): + pass + +class Null_result_command(Result_command): + def __init__(self): + super(Null_result_command,self).__init__(completed=True) + +NULL_RESULT=Null_result_command() + +class Slave_command(object): + def run(self,processor): + pass #FIXME do some inheritance -class Exit_command(object): - def run(self,relax,processor): + +class Exit_command(Slave_command): + def run(self,processor): + processor.return_object(NULL_RESULT) processor.do_quit=True -class Get_name_command(object): - def run(self,relax,processor): - result = '%s-%s' % (MPI.Get_processor_name(),os.getpid()) + + +class Get_name_command(Slave_command): + def run(self,processor): + msg = processor.get_name() + result = Result_string(msg,True) processor.return_object(result) #FIXME do some inheritance @@ -50,25 +105,78 @@ sys.exit= exit self.do_quit=False + def assert_on_master(self): + if MPI.rank != 0: + msg = 'running on slave when expected master with MPI.rank == 0, rank was %d'% MPI.rank + raise Exception(msg) + + + def get_name(self): + return '%s-%s' % (MPI.Get_processor_name(),os.getpid()) + def exit(self): exit_mpi() def return_object(self,result): MPI.COMM_WORLD.Send(buf=result, dest=0) - def run_command(self,command): + def run_command_queue(self,commands): + self.assert_on_master() + for i in range(1,MPI.size): - if i != 0: - MPI.COMM_WORLD.Send(buf=command,dest=i) - for i in range(1,MPI.size): - buf=[] - if i !=0: - elem = MPI.COMM_WORLD.Recv(source=i) - if type(elem) == 'object': - elem.run(relax_instance, relax_instance.processor) + MPI.COMM_WORLD.Send(buf=command,dest=i) + + def run_command_globally(self,command): + queue = [command for i in range(1,MPI.size)] + self.run_command_queue(queue) + + def run_command_queue(self,queue): + self.assert_on_master() + +# for i in range(1,MPI.size): +# MPI.COMM_WORLD.Send(buf=command,dest=i) +# for i in range(1,MPI.size): +# elem = MPI.COMM_WORLD.Recv(source=i) +# if type(elem) == 'object': +# elem.run(relax_instance, relax_instance.processor) +# else: +# #FIXME can't cope with multiple lines +# print i,elem + #queue = [command for i in range(1,MPI.size*2)] + + running_set=set() + idle_set=set([i for i in range(1,MPI.size)]) + + while len(queue) != 0: + + while len(idle_set) != 0: + if len(queue) != 0: + command = queue.pop() + dest=idle_set.pop() + MPI.COMM_WORLD.Send(buf=command,dest=dest) + running_set.add(dest) else: - #FIXME can't cope with multiple lines - print i,elem + break + + + while len(running_set) !=0: + result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) + if isinstance(result, Exception): + raise result + + if isinstance(result, Result): + if result.completed: + idle_set.add(result.rank) + running_set.remove(result.rank) + + if isinstance(result, Result_command): + result.run(self.relax,self) + elif isinstance(result, Result_string): + #FIXME can't cope with multiple lines + print result.rank,result.string + else: + message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__,result) + raise Exception(message) # for i in range(MPI.size): @@ -84,14 +192,14 @@ def run(self): - if MPI.rank == 0: - self.relax_instance.multi_mode='multi_master' - else: - self.relax_instance.multi_mode='multi_slave' - self.relax_instance.mode='slave' - self.relax_instance.script_file=None - self.relax_instance.dummy_mode=True - self.relax_instance.run() +# if MPI.rank == 0: +# self.relax_instance.multi_mode='multi_master' +# else: +# self.relax_instance.multi_mode='multi_slave' +# self.relax_instance.mode='slave' +# self.relax_instance.script_file=None +# self.relax_instance.dummy_mode=True +# #self.relax_instance.run() if MPI.rank ==0: @@ -101,7 +209,11 @@ #self.relax_instance.run(deamon=True) while not self.do_quit: command = MPI.COMM_WORLD.Recv(source=0) - command.run(self.relax_instance, self.relax_instance.processor) + try: + command.run(self) + except Exception,e: + self.return_object(e) + #if data=='close': Modified: branches/multi_processor/relax URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/relax?rev=3239&r1=3238&r2=3239&view=diff ============================================================================== --- branches/multi_processor/relax (original) +++ branches/multi_processor/relax Thu Mar 22 09:20:55 2007 @@ -149,12 +149,14 @@ #FIXME use self.mode all over mode = self.mode + print mode # Show the version number and exit. if mode == 'version': print 'relax ' + self.version sys.exit() + # FIXME threading # Logging. if self.log_file and mode != 'thread': self.IO.log(self.log_file) @@ -171,7 +173,7 @@ # Run the interpreter. self.interpreter = Interpreter(self) self.interpreter.run() - print 'exit' + elif mode == 'slave': self.interpreter = Interpreter(self) @@ -534,14 +536,14 @@ module = None result = None - try: - module = __import__(module_path,globals(), locals(), []) - if verbose: - print 'loaded module %s' % module_path - except Exception, e: - if verbose: - print 'failed to load module_path %s' % module_path - print 'exception:',e + #try: + module = __import__(module_path,globals(), locals(), []) + if verbose: + print 'loaded module %s' % module_path + #except Exception, e: + # if verbose: + # print 'failed to load module_path %s' % module_path + # print 'exception:',e #FIXME: needs more failure checking if module != None: @@ -562,6 +564,7 @@ modules = import_module(module_path) #print modules + if hasattr(modules[-1],class_name): clazz = getattr(modules[-1], class_name) else: