Author: bugman Date: Thu Oct 16 00:24:12 2008 New Revision: 7736 URL: http://svn.gna.org/viewcvs/relax?rev=7736&view=rev Log: Manually ported r3269, r3270, and r3271 from the multi_processor branch. The command used was: svn merge -r3268:3271 svn+ssh://bugman@xxxxxxxxxxx/svn/relax/branches/multi_processor . ..... r3271 | varioustoxins | 2007-04-23 13:06:57 +0200 (Mon, 23 Apr 2007) | 3 lines Changed paths: M /branches/multi_processor/multi/mpi4py_processor.py threaded results output to _TRY_ to avoid starving multiprocessor while waiting for stdio ------------------------------------------------------------------------ r3270 | varioustoxins | 2007-04-21 03:18:39 +0200 (Sat, 21 Apr 2007) | 2 lines Changed paths: M /branches/multi_processor/multi/mpi4py_processor.py forgot to remove sys.exit used in testing ------------------------------------------------------------------------ r3269 | varioustoxins | 2007-04-21 03:17:26 +0200 (Sat, 21 Apr 2007) | 2 lines Changed paths: M /branches/multi_processor/multi/PrependStringIO.py M /branches/multi_processor/multi/commands.py M /branches/multi_processor/multi/mpi4py_processor.py M /branches/multi_processor/multi/processor.py batched retrun results and better sys.exit and exception handling behaviour ..... Modified: branches/multi_processor_merge/multi/PrependStringIO.py branches/multi_processor_merge/multi/commands.py branches/multi_processor_merge/multi/mpi4py_processor.py branches/multi_processor_merge/multi/processor.py Modified: branches/multi_processor_merge/multi/PrependStringIO.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/PrependStringIO.py?rev=7736&r1=7735&r2=7736&view=diff ============================================================================== --- branches/multi_processor_merge/multi/PrependStringIO.py (original) +++ branches/multi_processor_merge/multi/PrependStringIO.py Thu Oct 16 00:24:12 2008 @@ -1,7 +1,6 @@ from StringIO import StringIO import sys -# these may need to be in c they cause an pprox 10% slowdown class PrependOut(StringIO): @@ -21,6 +20,9 @@ self.stream.write(string) #self.truncate(0) + # lost more functions needed use dict??? + def isatty(self,*args,**kwargs): + return stream.isatty(*args,**kwargs) # def flush(self): # self.stream.write(self.getvalue().rstrip(self.token)) # self.truncate(0) Modified: branches/multi_processor_merge/multi/commands.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/commands.py?rev=7736&r1=7735&r2=7736&view=diff ============================================================================== --- branches/multi_processor_merge/multi/commands.py (original) +++ branches/multi_processor_merge/multi/commands.py Thu Oct 16 00:24:12 2008 @@ -54,6 +54,19 @@ msg = processor.get_name() result = Result_string(msg,completed) processor.return_object(result) + +class Set_processor_property_command(Slave_command): + def __init__(self,property_map): + super(Set_processor_property_command,self).__init__() + self.property_map = property_map + + def run(self,processor,completed): + for property,value in self.property_map.items(): + try: + setattr(processor, property, value) + processor.return_object(NULL_RESULT) + except Exception, e: + processor.return_object(e) @@ -80,6 +93,8 @@ OFFSET_SHORT_MIN_PARAMS=0 OFFSET_SHORT_FK=1 OFFSET_SHORT_K=2 + + @@ -240,8 +255,8 @@ # add debug flag or extra channels that output immediately if processor.processor_size() > 1: pre_string = processor.rank_format_string() % processor.rank() - stderr_string = ' E>' - stdout_string = ' S>' + stderr_string = ' E> ' + stdout_string = ' S> ' else: pre_string = '' stderr_string = '' Modified: branches/multi_processor_merge/multi/mpi4py_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/mpi4py_processor.py?rev=7736&r1=7735&r2=7736&view=diff ============================================================================== --- branches/multi_processor_merge/multi/mpi4py_processor.py (original) +++ branches/multi_processor_merge/multi/mpi4py_processor.py Thu Oct 16 00:24:12 2008 @@ -29,15 +29,24 @@ import math import textwrap import traceback +import time +import Queue +import threading from multi.processor import Processor,Memo,Slave_command -from multi.processor import Result,Result_command,Result_string +from multi.processor import Result,Result_command,Result_string,Result_exception from multi.commands import Exit_command from copy import copy from multi.processor import Capturing_exception + + +in_main_loop = False + +# save original sys.exit to call after wrapper +_sys_exit = sys.exit # load mpi @@ -59,9 +68,7 @@ sys.stderr.write('exiting...\n\n') sys.exit() -# save original sys.exit to call after wrapper -if MPI.rank == 0: - _sys_exit = sys.exit + #FIXME: delete me #def rank_format_string(): @@ -72,10 +79,26 @@ #RANK_FORMAT_STRING = rank_format_string # wrapper sys.exit function +# CHECKME is status ok def exit(status=None): - exit_mpi() - _sys_exit(status) + if MPI.rank != 0: + if in_main_loop: + raise Exception('sys.exit unexpectedley called on slave!') + else: + sys.__stderr__.write('\n') + sys.__stderr__.write('***********************************************\n') + sys.__stderr__.write('\n') + sys.__stderr__.write('warning sys.exit called before mpi4py main loop\n') + sys.__stderr__.write('\n') + sys.__stderr__.write('***********************************************\n') + sys.__stderr__.write('\n') + MPI.COMM_WORLD.Abort() + else: + #print 'here' + exit_mpi() + #MPI.COMM_WORLD.Abort(1) + _sys_exit(status) def broadcast_command(command): for i in range(1,MPI.size): @@ -95,6 +118,59 @@ ditch_all_results() +class Batched_result_command(Result_command): + + def __init__(self,result_commands,completed=True): + super(Batched_result_command,self).__init__(completed=completed) + self.result_commands=result_commands + + + def run(self,relax,processor,batched_memo): + + processor.assert_on_master() + if batched_memo != None: + msg = "batched result commands shouldn't have memo values, memo: " + `batched_memo` + raise ValueError(msg) + + for result_command in self.result_commands: + processor.process_result(result_command) + + +class Exit_queue_result_command(Result_command): + def __init__(self,completed=True): + pass + +RESULT_QUEUE_EXIT_COMMAND = Exit_queue_result_command() + +class Threaded_result_queue(object): + def __init__(self,mpi4py_processor): + + self.queue = Queue.Queue() + self.mpi4py_processor = mpi4py_processor + self.sleep_time =0.05 + + self.running=1 + # FIXME: syntax error here produces exception but no quit + self.thread1 = threading.Thread(target=self.workerThread) + self.thread1.setDaemon(1) + self.thread1.start() + + def workerThread(self): + + while True: + item=self.queue.get() + if item == RESULT_QUEUE_EXIT_COMMAND: + break + self.mpi4py_processor.process_result(item) + + + def put(self,job): + self.queue.put_nowait(job) + + def run_all(self): + self.queue.put_nowait(RESULT_QUEUE_EXIT_COMMAND) + self.thread1.join() + @@ -103,7 +179,7 @@ - def __init__(self,relax_instance, chunkyness=3): + def __init__(self,relax_instance, chunkyness=1): super(Mpi4py_processor,self).__init__(relax_instance = relax_instance, chunkyness=chunkyness) @@ -116,6 +192,13 @@ self.command_queue=[] self.memo_map={} + self.batched_returns=True + self.result_list=None + + self.threaded_result_processing=True + + def abort(self): + MPI.COMM_WORLD.Abort() def add_to_queue(self,command,memo=None): self.command_queue.append(command) @@ -163,62 +246,87 @@ def get_name(self): return '%s-%s' % (MPI.Get_processor_name(),os.getpid()) + # CHECKME am i used def exit(self): exit_mpi() def return_object(self,result): - result.rank=MPI.rank - MPI.COMM_WORLD.Send(buf=result, dest=0) - - #FIXME: fill out - def process_result(self): - pass + result_object = None + #raise Exception('dummy') + if self.batched_returns: + is_batch_result = isinstance(result, Batched_result_command) + + + if is_batch_result: + result_object = result + else: + if self.result_list != None: + self.result_list.append(result) + else: + result_object=result + + + if result_object != None: + #FIXME check is used? + result_object.rank=MPI.rank + MPI.COMM_WORLD.Send(buf=result_object, dest=0) + + #FIXME: fill out generic result processing move to processor + def process_result(self,result): + + if isinstance(result, Result): + + if isinstance(result, Result_command): + memo=None + if result.memo_id != None: + memo=self.memo_map[result.memo_id] + result.run(self.relax_instance,self,memo) + if result.memo_id != None and result.completed: + del self.memo_map[result.memo_id] + + elif isinstance(result, Result_string): + #FIXME can't cope with multiple lines + self.save_stdout.write(result.string), + else: + message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__,result) + raise Exception(message) def run_command_queue(self,queue): - self.assert_on_master() - - running_set=set() - idle_set=set([i for i in range(1,MPI.size)]) - - while len(queue) != 0: - - while len(idle_set) != 0: - if len(queue) != 0: - command = queue.pop() - dest=idle_set.pop() - MPI.COMM_WORLD.Send(buf=command,dest=dest) - running_set.add(dest) - else: - break - - - while len(running_set) !=0: - result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) - if isinstance(result, Exception): - #FIXME: clear command queue - # and finalise mpi (or restart it if we can! - # also tracebacks are no good - raise result - - if isinstance(result, Result): + self.assert_on_master() + + running_set=set() + idle_set=set([i for i in range(1,MPI.size)]) + + if self.threaded_result_processing: + result_queue=Threaded_result_queue(self) + + + while len(queue) != 0: + + while len(idle_set) != 0: + if len(queue) != 0: + command = queue.pop() + dest=idle_set.pop() + MPI.COMM_WORLD.Send(buf=command,dest=dest) + running_set.add(dest) + else: + break + + + while len(running_set) !=0: + result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) + #print result + if result.completed: idle_set.add(result.rank) running_set.remove(result.rank) - - if isinstance(result, Result_command): - memo=None - if result.memo_id != None: - memo=self.memo_map[result.memo_id] - result.run(self.relax_instance,self,memo) - if result.memo_id != None and result.completed: - del self.memo_map[result.memo_id] - - elif isinstance(result, Result_string): - #FIXME can't cope with multiple lines - self.save_stdout.write(result.string), + if self.threaded_result_processing: + result_queue.put(result) else: - message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__,result) - raise Exception(message) + self.process_result(result) + + if self.threaded_result_processing: + result_queue.run_all() @@ -231,29 +339,55 @@ def run(self): - + global in_main_loop + in_main_loop= True if self.on_master(): - self.pre_run() - self.relax_instance.run() - self.post_run() + try: + self.pre_run() + self.relax_instance.run() + self.post_run() + except Exception,e: + # check me could be moved outside + #print e + traceback.print_exc(file=sys.stdout) + self.abort() # note this a modified exit that kills all MPI processors sys.exit() else: - - while not self.do_quit: - commands = MPI.COMM_WORLD.Recv(source=0) - - if not isinstance(commands,list): - commands = [commands] - last_command = len(commands)-1 - for i,command in enumerate(commands): - try: + try: + while not self.do_quit: + + commands = MPI.COMM_WORLD.Recv(source=0) + + + if not isinstance(commands,list): + commands = [commands] + last_command = len(commands)-1 + + if self.batched_returns: + self.result_list = [] + else: + self.result_list = None + + for i,command in enumerate(commands): + + #raise Exception('dummy') completed = (i == last_command) command.run(self,completed) - #raise Exception('dummy') - except Exception,e: - #self.return_object(e) - self.return_object(Capturing_exception(rank=self.rank(),name=self.get_name())) - + + + + if self.batched_returns: + self.return_object(Batched_result_command(result_commands=self.result_list)) + self.result_list=None + + except Exception,e: + self.result_list=None + capturing_exception = Capturing_exception(rank=self.rank(),name=self.get_name()) + exception_result = Result_exception(capturing_exception) + exception_result.rank=MPI.rank + MPI.COMM_WORLD.Send(buf=exception_result, dest=0) + + in_main_loop = False Modified: branches/multi_processor_merge/multi/processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/processor.py?rev=7736&r1=7735&r2=7736&view=diff ============================================================================== --- branches/multi_processor_merge/multi/processor.py (original) +++ branches/multi_processor_merge/multi/processor.py Thu Oct 16 00:24:12 2008 @@ -79,6 +79,9 @@ queue = [command for i in range(1,MPI.size)] self.run_command_queue(queue) + def abort(self): + sys.exit() + #FIXME: remname chunk* grain* def __init__(self,relax_instance,chunkyness=1): self.chunkyness = chunkyness @@ -122,9 +125,14 @@ format = '%%%di' % digits return format + + + class Result(object): def __init__(self,completed): self.completed=completed + self.memo_id=None + class Result_string(Result): @@ -147,6 +155,14 @@ super(Null_result_command,self).__init__(completed=completed) NULL_RESULT=Null_result_command() + +class Result_exception(Result_command): + def __init__(self,exception,completed=True): + super(Result_exception,self).__init__(completed=completed) + self.exception=exception + + def run(self,relax,processor,memos): + raise self.exception class Slave_command(object):