Author: varioustoxins Date: Mon Apr 23 13:06:57 2007 New Revision: 3271 URL: http://svn.gna.org/viewcvs/relax?rev=3271&view=rev Log: threaded results output to _TRY_ to avoid starving multiprocessor while waiting for stdio Modified: branches/multi_processor/multi/mpi4py_processor.py Modified: branches/multi_processor/multi/mpi4py_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3271&r1=3270&r2=3271&view=diff ============================================================================== --- branches/multi_processor/multi/mpi4py_processor.py (original) +++ branches/multi_processor/multi/mpi4py_processor.py Mon Apr 23 13:06:57 2007 @@ -29,6 +29,9 @@ import math import textwrap import traceback +import time +import Queue +import threading from multi.processor import Processor,Memo,Slave_command from multi.processor import Result,Result_command,Result_string,Result_exception @@ -39,6 +42,11 @@ + +in_main_loop = False + +# save original sys.exit to call after wrapper +_sys_exit = sys.exit # load mpi @@ -60,8 +68,7 @@ sys.stderr.write('exiting...\n\n') sys.exit() -# save original sys.exit to call after wrapper -_sys_exit = sys.exit + #FIXME: delete me #def rank_format_string(): @@ -76,7 +83,17 @@ def exit(status=None): if MPI.rank != 0: - raise Exception('sys.exit unexpectedley called on slave!') + if in_main_loop: + raise Exception('sys.exit unexpectedley called on slave!') + else: + sys.__stderr__.write('\n') + sys.__stderr__.write('***********************************************\n') + sys.__stderr__.write('\n') + sys.__stderr__.write('warning sys.exit called before mpi4py main loop\n') + sys.__stderr__.write('\n') + sys.__stderr__.write('***********************************************\n') + sys.__stderr__.write('\n') + MPI.COMM_WORLD.Abort() else: #print 'here' exit_mpi() @@ -119,6 +136,41 @@ processor.process_result(result_command) +class Exit_queue_result_command(Result_command): + def __init__(self,completed=True): + pass + +RESULT_QUEUE_EXIT_COMMAND = Exit_queue_result_command() + +class Threaded_result_queue(object): + def __init__(self,mpi4py_processor): + + self.queue = Queue.Queue() + self.mpi4py_processor = mpi4py_processor + self.sleep_time =0.05 + + self.running=1 + # FIXME: syntax error here produces exception but no quit + self.thread1 = threading.Thread(target=self.workerThread) + self.thread1.setDaemon(1) + self.thread1.start() + + def workerThread(self): + + while True: + item=self.queue.get() + if item == RESULT_QUEUE_EXIT_COMMAND: + break + self.mpi4py_processor.process_result(item) + + + def put(self,job): + self.queue.put_nowait(job) + + def run_all(self): + self.queue.put_nowait(RESULT_QUEUE_EXIT_COMMAND) + self.thread1.join() + @@ -127,7 +179,7 @@ - def __init__(self,relax_instance, chunkyness=3): + def __init__(self,relax_instance, chunkyness=1): super(Mpi4py_processor,self).__init__(relax_instance = relax_instance, chunkyness=chunkyness) @@ -143,8 +195,9 @@ self.batched_returns=True self.result_list=None + self.threaded_result_processing=True + def abort(self): - MPI.Finalize() MPI.COMM_WORLD.Abort() def add_to_queue(self,command,memo=None): @@ -243,6 +296,10 @@ running_set=set() idle_set=set([i for i in range(1,MPI.size)]) + + if self.threaded_result_processing: + result_queue=Threaded_result_queue(self) + while len(queue) != 0: @@ -258,12 +315,18 @@ while len(running_set) !=0: result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) - print result + #print result if result.completed: idle_set.add(result.rank) running_set.remove(result.rank) - self.process_result(result) + if self.threaded_result_processing: + result_queue.put(result) + else: + self.process_result(result) + + if self.threaded_result_processing: + result_queue.run_all() @@ -276,7 +339,8 @@ def run(self): - + global in_main_loop + in_main_loop= True if self.on_master(): try: @@ -285,15 +349,16 @@ self.post_run() except Exception,e: # check me could be moved outside - print e + #print e + traceback.print_exc(file=sys.stdout) self.abort() # note this a modified exit that kills all MPI processors sys.exit() else: try: - while not self.do_quit: + commands = MPI.COMM_WORLD.Recv(source=0) @@ -325,3 +390,4 @@ exception_result.rank=MPI.rank MPI.COMM_WORLD.Send(buf=exception_result, dest=0) + in_main_loop = False