Author: varioustoxins Date: Sat Apr 21 03:17:26 2007 New Revision: 3269 URL: http://svn.gna.org/viewcvs/relax?rev=3269&view=rev Log: batched retrun results and better sys.exit and exception handling behaviour Modified: branches/multi_processor/multi/PrependStringIO.py branches/multi_processor/multi/commands.py branches/multi_processor/multi/mpi4py_processor.py branches/multi_processor/multi/processor.py Modified: branches/multi_processor/multi/PrependStringIO.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/PrependStringIO.py?rev=3269&r1=3268&r2=3269&view=diff ============================================================================== --- branches/multi_processor/multi/PrependStringIO.py (original) +++ branches/multi_processor/multi/PrependStringIO.py Sat Apr 21 03:17:26 2007 @@ -1,7 +1,6 @@ from StringIO import StringIO import sys -# these may need to be in c they cause an pprox 10% slowdown class PrependOut(StringIO): @@ -21,6 +20,9 @@ self.stream.write(string) #self.truncate(0) + # lost more functions needed use dict??? + def isatty(self,*args,**kwargs): + return stream.isatty(*args,**kwargs) # def flush(self): # self.stream.write(self.getvalue().rstrip(self.token)) # self.truncate(0) Modified: branches/multi_processor/multi/commands.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/commands.py?rev=3269&r1=3268&r2=3269&view=diff ============================================================================== --- branches/multi_processor/multi/commands.py (original) +++ branches/multi_processor/multi/commands.py Sat Apr 21 03:17:26 2007 @@ -54,6 +54,19 @@ msg = processor.get_name() result = Result_string(msg,completed) processor.return_object(result) + +class Set_processor_property_command(Slave_command): + def __init__(self,property_map): + super(Set_processor_property_command,self).__init__() + self.property_map = property_map + + def run(self,processor,completed): + for property,value in self.property_map.items(): + try: + setattr(processor, property, value) + processor.return_object(NULL_RESULT) + except Exception, e: + processor.return_object(e) @@ -80,6 +93,8 @@ OFFSET_SHORT_MIN_PARAMS=0 OFFSET_SHORT_FK=1 OFFSET_SHORT_K=2 + + @@ -240,8 +255,8 @@ # add debug flag or extra channels that output immediately if processor.processor_size() > 1: pre_string = processor.rank_format_string() % processor.rank() - stderr_string = ' E>' - stdout_string = ' S>' + stderr_string = ' E> ' + stdout_string = ' S> ' else: pre_string = '' stderr_string = '' Modified: branches/multi_processor/multi/mpi4py_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3269&r1=3268&r2=3269&view=diff ============================================================================== --- branches/multi_processor/multi/mpi4py_processor.py (original) +++ branches/multi_processor/multi/mpi4py_processor.py Sat Apr 21 03:17:26 2007 @@ -31,11 +31,12 @@ import traceback from multi.processor import Processor,Memo,Slave_command -from multi.processor import Result,Result_command,Result_string +from multi.processor import Result,Result_command,Result_string,Result_exception from multi.commands import Exit_command from copy import copy from multi.processor import Capturing_exception + @@ -60,8 +61,7 @@ sys.exit() # save original sys.exit to call after wrapper -if MPI.rank == 0: - _sys_exit = sys.exit +_sys_exit = sys.exit #FIXME: delete me #def rank_format_string(): @@ -72,10 +72,16 @@ #RANK_FORMAT_STRING = rank_format_string # wrapper sys.exit function +# CHECKME is status ok def exit(status=None): - exit_mpi() - _sys_exit(status) + if MPI.rank != 0: + raise Exception('sys.exit unexpectedley called on slave!') + else: + #print 'here' + exit_mpi() + #MPI.COMM_WORLD.Abort(1) + _sys_exit(status) def broadcast_command(command): for i in range(1,MPI.size): @@ -95,6 +101,24 @@ ditch_all_results() +class Batched_result_command(Result_command): + + def __init__(self,result_commands,completed=True): + super(Batched_result_command,self).__init__(completed=completed) + self.result_commands=result_commands + + + def run(self,relax,processor,batched_memo): + + processor.assert_on_master() + if batched_memo != None: + msg = "batched result commands shouldn't have memo values, memo: " + `batched_memo` + raise ValueError(msg) + + for result_command in self.result_commands: + processor.process_result(result_command) + + @@ -116,6 +140,12 @@ self.command_queue=[] self.memo_map={} + self.batched_returns=True + self.result_list=None + + def abort(self): + MPI.Finalize() + MPI.COMM_WORLD.Abort() def add_to_queue(self,command,memo=None): self.command_queue.append(command) @@ -163,62 +193,77 @@ def get_name(self): return '%s-%s' % (MPI.Get_processor_name(),os.getpid()) + # CHECKME am i used def exit(self): exit_mpi() def return_object(self,result): - result.rank=MPI.rank - MPI.COMM_WORLD.Send(buf=result, dest=0) - - #FIXME: fill out - def process_result(self): - pass + result_object = None + #raise Exception('dummy') + if self.batched_returns: + is_batch_result = isinstance(result, Batched_result_command) + + + if is_batch_result: + result_object = result + else: + if self.result_list != None: + self.result_list.append(result) + else: + result_object=result + + + if result_object != None: + #FIXME check is used? + result_object.rank=MPI.rank + MPI.COMM_WORLD.Send(buf=result_object, dest=0) + + #FIXME: fill out generic result processing move to processor + def process_result(self,result): + + if isinstance(result, Result): + + if isinstance(result, Result_command): + memo=None + if result.memo_id != None: + memo=self.memo_map[result.memo_id] + result.run(self.relax_instance,self,memo) + if result.memo_id != None and result.completed: + del self.memo_map[result.memo_id] + + elif isinstance(result, Result_string): + #FIXME can't cope with multiple lines + self.save_stdout.write(result.string), + else: + message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__,result) + raise Exception(message) def run_command_queue(self,queue): - self.assert_on_master() - - running_set=set() - idle_set=set([i for i in range(1,MPI.size)]) - - while len(queue) != 0: - - while len(idle_set) != 0: - if len(queue) != 0: - command = queue.pop() - dest=idle_set.pop() - MPI.COMM_WORLD.Send(buf=command,dest=dest) - running_set.add(dest) - else: - break - - - while len(running_set) !=0: - result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) - if isinstance(result, Exception): - #FIXME: clear command queue - # and finalise mpi (or restart it if we can! - # also tracebacks are no good - raise result - - if isinstance(result, Result): + self.assert_on_master() + + running_set=set() + idle_set=set([i for i in range(1,MPI.size)]) + + while len(queue) != 0: + + while len(idle_set) != 0: + if len(queue) != 0: + command = queue.pop() + dest=idle_set.pop() + MPI.COMM_WORLD.Send(buf=command,dest=dest) + running_set.add(dest) + else: + break + + + while len(running_set) !=0: + result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) + print result + if result.completed: idle_set.add(result.rank) running_set.remove(result.rank) - - if isinstance(result, Result_command): - memo=None - if result.memo_id != None: - memo=self.memo_map[result.memo_id] - result.run(self.relax_instance,self,memo) - if result.memo_id != None and result.completed: - del self.memo_map[result.memo_id] - - elif isinstance(result, Result_string): - #FIXME can't cope with multiple lines - self.save_stdout.write(result.string), - else: - message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__,result) - raise Exception(message) + self.process_result(result) @@ -234,26 +279,49 @@ if self.on_master(): - self.pre_run() - self.relax_instance.run() - self.post_run() + try: + self.pre_run() + self.relax_instance.run() + self.post_run() + except Exception,e: + # check me could be moved outside + print e + self.abort() # note this a modified exit that kills all MPI processors sys.exit() else: - - while not self.do_quit: - commands = MPI.COMM_WORLD.Recv(source=0) - - if not isinstance(commands,list): - commands = [commands] - last_command = len(commands)-1 - for i,command in enumerate(commands): - try: + try: + + while not self.do_quit: + commands = MPI.COMM_WORLD.Recv(source=0) + sys.exit() + + if not isinstance(commands,list): + commands = [commands] + last_command = len(commands)-1 + + if self.batched_returns: + self.result_list = [] + else: + self.result_list = None + + for i,command in enumerate(commands): + + #raise Exception('dummy') completed = (i == last_command) command.run(self,completed) - #raise Exception('dummy') - except Exception,e: - #self.return_object(e) - self.return_object(Capturing_exception(rank=self.rank(),name=self.get_name())) - + + + + if self.batched_returns: + self.return_object(Batched_result_command(result_commands=self.result_list)) + self.result_list=None + + except Exception,e: + self.result_list=None + capturing_exception = Capturing_exception(rank=self.rank(),name=self.get_name()) + exception_result = Result_exception(capturing_exception) + exception_result.rank=MPI.rank + MPI.COMM_WORLD.Send(buf=exception_result, dest=0) + Modified: branches/multi_processor/multi/processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/processor.py?rev=3269&r1=3268&r2=3269&view=diff ============================================================================== --- branches/multi_processor/multi/processor.py (original) +++ branches/multi_processor/multi/processor.py Sat Apr 21 03:17:26 2007 @@ -79,6 +79,9 @@ queue = [command for i in range(1,MPI.size)] self.run_command_queue(queue) + def abort(self): + sys.exit() + #FIXME: remname chunk* grain* def __init__(self,relax_instance,chunkyness=1): self.chunkyness = chunkyness @@ -122,9 +125,14 @@ format = '%%%di' % digits return format + + + class Result(object): def __init__(self,completed): self.completed=completed + self.memo_id=None + class Result_string(Result): @@ -147,6 +155,14 @@ super(Null_result_command,self).__init__(completed=completed) NULL_RESULT=Null_result_command() + +class Result_exception(Result_command): + def __init__(self,exception,completed=True): + super(Result_exception,self).__init__(completed=completed) + self.exception=exception + + def run(self,relax,processor,memos): + raise self.exception class Slave_command(object):