Author: varioustoxins Date: Sat Apr 28 19:59:04 2007 New Revision: 3275 URL: http://svn.gna.org/viewcvs/relax?rev=3275&view=rev Log: improved output and exception handling Modified: branches/multi_processor/minimise/generic.py branches/multi_processor/multi/PrependStringIO.py branches/multi_processor/multi/commands.py branches/multi_processor/multi/mpi4py_processor.py branches/multi_processor/multi/processor.py Modified: branches/multi_processor/minimise/generic.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/minimise/generic.py?rev=3275&r1=3274&r2=3275&view=diff ============================================================================== --- branches/multi_processor/minimise/generic.py (original) +++ branches/multi_processor/minimise/generic.py Sat Apr 28 19:59:04 2007 @@ -425,6 +425,6 @@ else: print print_prefix + "Parameter values: " + `results` print "" - #FIXME: raising an exception here wedges mpi4py + return results Modified: branches/multi_processor/multi/PrependStringIO.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/PrependStringIO.py?rev=3275&r1=3274&r2=3275&view=diff ============================================================================== --- branches/multi_processor/multi/PrependStringIO.py (original) +++ branches/multi_processor/multi/PrependStringIO.py Sat Apr 28 19:59:04 2007 @@ -3,7 +3,7 @@ - +#FIXME could these two classes be merged via use of a target stream and multiple inheritance? class PrependOut(StringIO): def __init__(self,token,stream): @@ -37,27 +37,29 @@ class PrependStringIO(StringIO): - def __init__(self,token): + def __init__(self,token,target_stream=None): StringIO.__init__(self) self.token = token self.token_length = len(token) self.first_time = True + if target_stream == None: + self.target_stream=self + else: + self.target_stream=target_stream + + def write(self,string): # FIXME: raising an exception here wedges mpi4py - file_name = sys._getframe(1).f_code.co_filename.split('/')[-1] - function_name = sys._getframe(1).f_code.co_name - line_number = sys._getframe(1).f_lineno - #msg = '<<%d - %s - %s - %d: %s>>' %(id(self),file_name,function_name,line_number,string) - #sys.__stdout__.write(msg) + string = string.replace('\n', '\n' + self.token) if self.first_time == True: string ='\n' +self.token + string self.first_time = False - StringIO.write(self,string) + StringIO.write(self.target_stream,string) Modified: branches/multi_processor/multi/commands.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/commands.py?rev=3275&r1=3274&r2=3275&view=diff ============================================================================== --- branches/multi_processor/multi/commands.py (original) +++ branches/multi_processor/multi/commands.py Sat Apr 28 19:59:04 2007 @@ -24,7 +24,7 @@ from multi.PrependStringIO import PrependStringIO from multi.processor import Memo,Slave_command -from multi.processor import Result_command,Result_string,NULL_RESULT +from multi.processor import Result_command,Result_string from re import match from maths_fns.mf import Mf @@ -41,7 +41,7 @@ super(Exit_command,self).__init__() def run(self,processor,completed): - processor.return_object(NULL_RESULT) + processor.return_object(processor.NULL_RESULT) processor.do_quit=True @@ -64,7 +64,7 @@ for property,value in self.property_map.items(): try: setattr(processor, property, value) - processor.return_object(NULL_RESULT) + processor.return_object(processor.NULL_RESULT) except Exception, e: processor.return_object(e) @@ -99,8 +99,8 @@ class MF_result_command(Result_command): - def __init__(self,memo_id,param_vector, func, iter, fc, gc, hc, warning,completed): - super(MF_result_command,self).__init__(completed=completed) + def __init__(self,processor,memo_id,param_vector, func, iter, fc, gc, hc, warning,completed): + super(MF_result_command,self).__init__(processor=processor,completed=completed) self.memo_id=memo_id self.param_vector=param_vector self.func=func @@ -239,8 +239,8 @@ param_vector, func, iter, fc, gc, hc, warning = results result_string = sys.stdout.getvalue() + sys.stderr.getvalue() - processor.return_object(MF_result_command(self.memo_id,param_vector, func, iter, fc, gc, hc, warning,completed=False)) - processor.return_object(Result_string(result_string,completed=completed)) + processor.return_object(MF_result_command(processor,self.memo_id,param_vector, func, iter, fc, gc, hc, warning,completed=False)) + processor.return_object(Result_string(processor,result_string,completed=completed)) def pre_command_feed_back(self,processor): self.do_feedback() @@ -262,7 +262,7 @@ stderr_string = '' stdout_string = '' sys.stdout = PrependStringIO(pre_string + stdout_string) - sys.stderr = PrependStringIO(pre_string + stderr_string) + sys.stderr = PrependStringIO(pre_string + stderr_string,target_stream=sys.stdout) def post_run(self,processor): #FIXME: move to processor startup @@ -359,7 +359,7 @@ param_vector, func, iter, fc, gc, hc, warning = results result_string = sys.stdout.getvalue() + sys.stderr.getvalue() - processor.return_object(MF_grid_result_command(result_string,self.memo_id,param_vector, func, iter, fc, gc, hc, warning,completed=completed)) + processor.return_object(MF_grid_result_command(processor,result_string,self.memo_id,param_vector, func, iter, fc, gc, hc, warning,completed=completed)) class MF_grid_memo(Memo): def __init__(self,super_grid_memo): @@ -420,8 +420,8 @@ self.h_count += results[OFFSET_H_COUNT] if results[OFFSET_WARNING] != None: self.warning.append(results[OFFSET_WARNING]) - #FIXME: - #TESTME: do we sue short results? + + #FIXME: TESTME: do we use short results? else: if results[OFFSET_SHORT_FK] < self.short_result[OFFSET_SHORT_FK]: self.short_result[OFFSET_SHORT_MIN_PARAMS] = results[OFFSET_SHORT_MIN_PARAMS] @@ -445,8 +445,8 @@ #print '****', self.xk,self.fk,self.k,self.f_count,self.g_count,self.h_count,self.warning class MF_grid_result_command(Result_command): - def __init__(self,result_string,memo_id,param_vector, func, iter, fc, gc, hc, warning,completed): - super(MF_grid_result_command,self).__init__(completed=completed) + def __init__(self,processor,result_string,memo_id,param_vector, func, iter, fc, gc, hc, warning,completed): + super(MF_grid_result_command,self).__init__(processor=processor,completed=completed) self.result_string=result_string self.memo_id=memo_id self.param_vector=param_vector Modified: branches/multi_processor/multi/mpi4py_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3275&r1=3274&r2=3275&view=diff ============================================================================== --- branches/multi_processor/multi/mpi4py_processor.py (original) +++ branches/multi_processor/multi/mpi4py_processor.py Sat Apr 28 19:59:04 2007 @@ -23,7 +23,7 @@ ################################################################################ # TODO: clone communicators & resize -# TODO: check exceptiosn on master +# TODO: check exceptions on master import sys import os import math @@ -39,6 +39,7 @@ from copy import copy from multi.processor import Capturing_exception +from processor import raise_unimplimented @@ -120,8 +121,8 @@ class Batched_result_command(Result_command): - def __init__(self,result_commands,completed=True): - super(Batched_result_command,self).__init__(completed=completed) + def __init__(self,processor,result_commands,completed=True): + super(Batched_result_command,self).__init__(processor=processor,completed=completed) self.result_commands=result_commands @@ -141,12 +142,21 @@ pass RESULT_QUEUE_EXIT_COMMAND = Exit_queue_result_command() - -class Threaded_result_queue(object): +class Result_queue(object): def __init__(self,mpi4py_processor): - + self.mpi4py_processor = mpi4py_processor + + def put(self,job): + if isinstance(job, Result_exception) : + self.mpi4py_processor.process_result(job) + + def run_all(self): + raise_unimplimented(self.run_all) + +class Threaded_result_queue(Result_queue): + def __init__(self,mpi4py_processor): + super(Threaded_result_queue,self).__init__(mpi4py_processor) self.queue = Queue.Queue() - self.mpi4py_processor = mpi4py_processor self.sleep_time =0.05 self.running=1 @@ -158,21 +168,30 @@ def workerThread(self): while True: - item=self.queue.get() - if item == RESULT_QUEUE_EXIT_COMMAND: + job=self.queue.get() + if job == RESULT_QUEUE_EXIT_COMMAND: break - self.mpi4py_processor.process_result(item) + self.mpi4py_processor.process_result(job) def put(self,job): + super(Threaded_result_queue,self).put(job) self.queue.put_nowait(job) def run_all(self): self.queue.put_nowait(RESULT_QUEUE_EXIT_COMMAND) self.thread1.join() - - +class Immediate_result_queue(Result_queue): + def __init(self,mpi4py_processor): + super(Threaded_result_queue,self).__init__(mpi4py_processor) + + def put(self,job): + super(Immediate_result_queue,self).put(job) + self.mpi4py_processor.process_result(job) + + def run_all(self): + pass #FIXME: do some inheritance class Mpi4py_processor(Processor): @@ -181,6 +200,7 @@ def __init__(self,relax_instance, chunkyness=1): super(Mpi4py_processor,self).__init__(relax_instance = relax_instance, chunkyness=chunkyness) + # wrap sys.exit to close down mpi before exiting @@ -251,9 +271,12 @@ exit_mpi() def return_object(self,result): + result_object = None #raise Exception('dummy') - if self.batched_returns: + if isinstance(result, Result_exception): + result_object=result + elif self.batched_returns: is_batch_result = isinstance(result, Batched_result_command) @@ -266,10 +289,22 @@ result_object=result + if result_object != None: #FIXME check is used? result_object.rank=MPI.rank MPI.COMM_WORLD.Send(buf=result_object, dest=0) + +# def queue_result_processing(self,result): +# # exceptions are handled instantly not queued to avoid deadlock! +# if isinstance(result, Result_exception): +# sys.exit() +# self.process_result(result) +# +# if self.threaded_result_processing: +# self.result_queue.put(result) +# else: +# self.process_result(result) #FIXME: fill out generic result processing move to processor def process_result(self,result): @@ -299,7 +334,8 @@ if self.threaded_result_processing: result_queue=Threaded_result_queue(self) - + else: + result_queue=Immediate_result_queue(self) while len(queue) != 0: @@ -315,15 +351,15 @@ while len(running_set) !=0: result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) - #print result + #if isinstance(result, Result_exception): + # print 'result', result + # sys.exit() if result.completed: idle_set.add(result.rank) running_set.remove(result.rank) - if self.threaded_result_processing: - result_queue.put(result) - else: - self.process_result(result) + + result_queue.put(result) if self.threaded_result_processing: result_queue.run_all() @@ -336,6 +372,11 @@ result = True return result + def print_message(self,message): + f=open ('error' + `self.rank()` + '.txt','a') + f.write(message+'\n') + f.flush() + f.close() def run(self): @@ -356,8 +397,9 @@ # note this a modified exit that kills all MPI processors sys.exit() else: - try: - while not self.do_quit: + + while not self.do_quit: + try: commands = MPI.COMM_WORLD.Recv(source=0) @@ -380,14 +422,20 @@ if self.batched_returns: - self.return_object(Batched_result_command(result_commands=self.result_list)) + self.return_object(Batched_result_command(processor=self,result_commands=self.result_list)) self.result_list=None - except Exception,e: - self.result_list=None - capturing_exception = Capturing_exception(rank=self.rank(),name=self.get_name()) - exception_result = Result_exception(capturing_exception) - exception_result.rank=MPI.rank - MPI.COMM_WORLD.Send(buf=exception_result, dest=0) + + except: + capturing_exception = Capturing_exception(rank=self.rank(),name=self.get_name()) + exception_result = Result_exception(exception=capturing_exception,processor=self,completed=True) + #error = 'sending exception' + `e` + e.__str__() + #self.print_message(error) + #result = Result_string('sending exception' + `e`, True) + #exception_result.rank=MPI.rank + self.return_object(exception_result) + #error = 'sending exception' + `e` + e.__str__() + #MPI.COMM_WORLD.Send(buf=exception_result, dest=0) + self.result_list=None in_main_loop = False Modified: branches/multi_processor/multi/processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/processor.py?rev=3275&r1=3274&r2=3275&view=diff ============================================================================== --- branches/multi_processor/multi/processor.py (original) +++ branches/multi_processor/multi/processor.py Sat Apr 28 19:59:04 2007 @@ -29,6 +29,21 @@ import traceback,textwrap +def print_file_lineno(range=xrange(1,2)): + + + for level in range: + print '<< ', level, + try: + file_name = sys._getframe(level).f_code.co_filename + function_name = sys._getframe(level).f_code.co_name + line_number = sys._getframe(level).f_lineno + msg = ': %s - %s - %d>>' %(file_name,function_name,line_number) + print msg + except Exception, e: + print e + break + def raise_unimplimented(method): raise NotImplementedError("Attempt to invoke unimplemented abstract method %s") % method.__name__ @@ -40,6 +55,14 @@ class Processor(object): + #FIXME: remname chunk* grain* + def __init__(self,relax_instance,chunkyness=1): + self.pre_queue_command=None + self.post_queue_command=None + self.chunkyness = chunkyness + self.relax_instance = relax_instance + self.NULL_RESULT=Null_result_command(processor=self) + def add_to_queue(self,command,memo=None): raise_unimplimented(self.add_to_queue) @@ -82,10 +105,6 @@ def abort(self): sys.exit() - #FIXME: remname chunk* grain* - def __init__(self,relax_instance,chunkyness=1): - self.chunkyness = chunkyness - self.relax_instance = relax_instance def pre_run(self): if self.on_master(): @@ -97,8 +116,10 @@ if self.processor_size() > 1: pre_string = 'M'*self.rank_format_string_width() + sys.stdout = PrependOut(pre_string + ' S> ', sys.stdout) - sys.stderr = PrependOut(pre_string + ' E> ', sys.stderr) + #FIXME: seems to be that writing to stderr results leeds to incorrect serialisation of output + sys.stderr = PrependOut(pre_string + ' E> ', sys.__stdout__) def get_time_delta(self,start_time,end_time): @@ -129,36 +150,37 @@ class Result(object): - def __init__(self,completed): + def __init__(self,processor,completed): self.completed=completed self.memo_id=None - + self.rank = processor.rank() class Result_string(Result): #FIXME move result up a level - def __init__(self,string,completed): - super(Result_string,self).__init__(completed=completed) + def __init__(self,processor,string,completed): + super(Result_string,self).__init__(processor=processor,completed=completed) self.string=string class Result_command(Result): - def __init__(self,completed,memo_id=None): - super(Result_command,self).__init__(completed=completed) + def __init__(self,processor,completed,memo_id=None): + super(Result_command,self).__init__(processor=processor,completed=completed) self.memo_id=memo_id + def run(self,relax,processor,memo): pass class Null_result_command(Result_command): - def __init__(self,completed=True): - super(Null_result_command,self).__init__(completed=completed) - -NULL_RESULT=Null_result_command() + def __init__(self,processor,completed=True): + super(Null_result_command,self).__init__(processor=processor,completed=completed) + + class Result_exception(Result_command): - def __init__(self,exception,completed=True): - super(Result_exception,self).__init__(completed=completed) + def __init__(self,processor,exception,completed=True): + super(Result_exception,self).__init__(processor=processor,completed=completed) self.exception=exception def run(self,relax,processor,memos):