mailr3275 - in /branches/multi_processor: minimise/ multi/


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by garyt on April 28, 2007 - 19:59:
Author: varioustoxins
Date: Sat Apr 28 19:59:04 2007
New Revision: 3275

URL: http://svn.gna.org/viewcvs/relax?rev=3275&view=rev
Log:
improved output and exception handling

Modified:
    branches/multi_processor/minimise/generic.py
    branches/multi_processor/multi/PrependStringIO.py
    branches/multi_processor/multi/commands.py
    branches/multi_processor/multi/mpi4py_processor.py
    branches/multi_processor/multi/processor.py

Modified: branches/multi_processor/minimise/generic.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/minimise/generic.py?rev=3275&r1=3274&r2=3275&view=diff
==============================================================================
--- branches/multi_processor/minimise/generic.py (original)
+++ branches/multi_processor/minimise/generic.py Sat Apr 28 19:59:04 2007
@@ -425,6 +425,6 @@
         else:
             print print_prefix + "Parameter values: " + `results`
         print ""
-        #FIXME: raising an exception here wedges mpi4py
+
 
     return results

Modified: branches/multi_processor/multi/PrependStringIO.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/PrependStringIO.py?rev=3275&r1=3274&r2=3275&view=diff
==============================================================================
--- branches/multi_processor/multi/PrependStringIO.py (original)
+++ branches/multi_processor/multi/PrependStringIO.py Sat Apr 28 19:59:04 2007
@@ -3,7 +3,7 @@
 
 
 
-
+#FIXME could these two classes be merged via use of a target stream and 
multiple inheritance?
 class PrependOut(StringIO):
 
     def __init__(self,token,stream):
@@ -37,27 +37,29 @@
 
 class PrependStringIO(StringIO):
 
-    def __init__(self,token):
+    def __init__(self,token,target_stream=None):
         StringIO.__init__(self)
         self.token = token
         self.token_length = len(token)
         self.first_time = True
+        if target_stream == None:
+            self.target_stream=self
+        else:
+            self.target_stream=target_stream
+
+
 
 
     def write(self,string):
         # FIXME: raising an exception here wedges mpi4py
-        file_name = sys._getframe(1).f_code.co_filename.split('/')[-1]
-        function_name = sys._getframe(1).f_code.co_name
-        line_number = sys._getframe(1).f_lineno
-        #msg = '<<%d - %s - %s - %d: %s>>'  
%(id(self),file_name,function_name,line_number,string)
-        #sys.__stdout__.write(msg)
+
         string = string.replace('\n', '\n' + self.token)
         if self.first_time == True:
             string ='\n' +self.token + string
             self.first_time = False
 
 
-        StringIO.write(self,string)
+        StringIO.write(self.target_stream,string)
 
 
 

Modified: branches/multi_processor/multi/commands.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/commands.py?rev=3275&r1=3274&r2=3275&view=diff
==============================================================================
--- branches/multi_processor/multi/commands.py (original)
+++ branches/multi_processor/multi/commands.py Sat Apr 28 19:59:04 2007
@@ -24,7 +24,7 @@
 from  multi.PrependStringIO import PrependStringIO
 
 from multi.processor import Memo,Slave_command
-from multi.processor import Result_command,Result_string,NULL_RESULT
+from multi.processor import Result_command,Result_string
 from re import match
 
 from maths_fns.mf import Mf
@@ -41,7 +41,7 @@
         super(Exit_command,self).__init__()
 
     def run(self,processor,completed):
-        processor.return_object(NULL_RESULT)
+        processor.return_object(processor.NULL_RESULT)
         processor.do_quit=True
 
 
@@ -64,7 +64,7 @@
         for property,value in self.property_map.items():
             try:
                 setattr(processor, property, value)
-                processor.return_object(NULL_RESULT)
+                processor.return_object(processor.NULL_RESULT)
             except Exception, e:
                 processor.return_object(e)
 
@@ -99,8 +99,8 @@
 
 
 class MF_result_command(Result_command):
-    def __init__(self,memo_id,param_vector, func, iter, fc, gc, hc, 
warning,completed):
-        super(MF_result_command,self).__init__(completed=completed)
+    def __init__(self,processor,memo_id,param_vector, func, iter, fc, gc, 
hc, warning,completed):
+        
super(MF_result_command,self).__init__(processor=processor,completed=completed)
         self.memo_id=memo_id
         self.param_vector=param_vector
         self.func=func
@@ -239,8 +239,8 @@
         param_vector, func, iter, fc, gc, hc, warning = results
 
         result_string = sys.stdout.getvalue() + sys.stderr.getvalue()
-        processor.return_object(MF_result_command(self.memo_id,param_vector, 
func, iter, fc, gc, hc, warning,completed=False))
-        
processor.return_object(Result_string(result_string,completed=completed))
+        
processor.return_object(MF_result_command(processor,self.memo_id,param_vector,
 func, iter, fc, gc, hc, warning,completed=False))
+        
processor.return_object(Result_string(processor,result_string,completed=completed))
 
     def pre_command_feed_back(self,processor):
         self.do_feedback()
@@ -262,7 +262,7 @@
             stderr_string = ''
             stdout_string  = ''
         sys.stdout = PrependStringIO(pre_string + stdout_string)
-        sys.stderr = PrependStringIO(pre_string + stderr_string)
+        sys.stderr = PrependStringIO(pre_string + 
stderr_string,target_stream=sys.stdout)
 
     def post_run(self,processor):
         #FIXME: move to processor startup
@@ -359,7 +359,7 @@
         param_vector, func, iter, fc, gc, hc, warning = results
 
         result_string = sys.stdout.getvalue() + sys.stderr.getvalue()
-        
processor.return_object(MF_grid_result_command(result_string,self.memo_id,param_vector,
 func, iter, fc, gc, hc, warning,completed=completed))
+        
processor.return_object(MF_grid_result_command(processor,result_string,self.memo_id,param_vector,
 func, iter, fc, gc, hc, warning,completed=completed))
 
 class MF_grid_memo(Memo):
     def __init__(self,super_grid_memo):
@@ -420,8 +420,8 @@
             self.h_count += results[OFFSET_H_COUNT]
             if results[OFFSET_WARNING] != None:
                 self.warning.append(results[OFFSET_WARNING])
-        #FIXME:
-        #TESTME: do we sue short results?
+
+        #FIXME: TESTME: do we use short results?
         else:
             if results[OFFSET_SHORT_FK] < self.short_result[OFFSET_SHORT_FK]:
                 self.short_result[OFFSET_SHORT_MIN_PARAMS] = 
results[OFFSET_SHORT_MIN_PARAMS]
@@ -445,8 +445,8 @@
         #print   '****', 
self.xk,self.fk,self.k,self.f_count,self.g_count,self.h_count,self.warning
 
 class MF_grid_result_command(Result_command):
-    def __init__(self,result_string,memo_id,param_vector, func, iter, fc, 
gc, hc, warning,completed):
-        super(MF_grid_result_command,self).__init__(completed=completed)
+    def __init__(self,processor,result_string,memo_id,param_vector, func, 
iter, fc, gc, hc, warning,completed):
+        
super(MF_grid_result_command,self).__init__(processor=processor,completed=completed)
         self.result_string=result_string
         self.memo_id=memo_id
         self.param_vector=param_vector

Modified: branches/multi_processor/multi/mpi4py_processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3275&r1=3274&r2=3275&view=diff
==============================================================================
--- branches/multi_processor/multi/mpi4py_processor.py (original)
+++ branches/multi_processor/multi/mpi4py_processor.py Sat Apr 28 19:59:04 
2007
@@ -23,7 +23,7 @@
 
################################################################################
 
 # TODO: clone communicators & resize
-# TODO: check exceptiosn on master
+# TODO: check exceptions on master
 import sys
 import os
 import math
@@ -39,6 +39,7 @@
 
 from copy import copy
 from multi.processor import Capturing_exception
+from processor import raise_unimplimented
 
 
 
@@ -120,8 +121,8 @@
 
 class Batched_result_command(Result_command):
 
-    def __init__(self,result_commands,completed=True):
-        super(Batched_result_command,self).__init__(completed=completed)
+    def __init__(self,processor,result_commands,completed=True):
+        
super(Batched_result_command,self).__init__(processor=processor,completed=completed)
         self.result_commands=result_commands
 
 
@@ -141,12 +142,21 @@
         pass
 
 RESULT_QUEUE_EXIT_COMMAND = Exit_queue_result_command()
-
-class Threaded_result_queue(object):
+class Result_queue(object):
     def __init__(self,mpi4py_processor):
-
+        self.mpi4py_processor = mpi4py_processor
+
+    def put(self,job):
+        if isinstance(job, Result_exception) :
+            self.mpi4py_processor.process_result(job)
+
+    def run_all(self):
+        raise_unimplimented(self.run_all)
+
+class Threaded_result_queue(Result_queue):
+    def __init__(self,mpi4py_processor):
+        super(Threaded_result_queue,self).__init__(mpi4py_processor)
         self.queue = Queue.Queue()
-        self.mpi4py_processor = mpi4py_processor
         self.sleep_time =0.05
 
         self.running=1
@@ -158,21 +168,30 @@
     def workerThread(self):
 
             while True:
-                item=self.queue.get()
-                if item == RESULT_QUEUE_EXIT_COMMAND:
+                job=self.queue.get()
+                if job == RESULT_QUEUE_EXIT_COMMAND:
                     break
-                self.mpi4py_processor.process_result(item)
+                self.mpi4py_processor.process_result(job)
 
 
     def put(self,job):
+        super(Threaded_result_queue,self).put(job)
         self.queue.put_nowait(job)
 
     def run_all(self):
         self.queue.put_nowait(RESULT_QUEUE_EXIT_COMMAND)
         self.thread1.join()
 
-
-
+class Immediate_result_queue(Result_queue):
+    def __init(self,mpi4py_processor):
+        super(Threaded_result_queue,self).__init__(mpi4py_processor)
+
+    def put(self,job):
+        super(Immediate_result_queue,self).put(job)
+        self.mpi4py_processor.process_result(job)
+
+    def run_all(self):
+        pass
 
 #FIXME: do some inheritance
 class Mpi4py_processor(Processor):
@@ -181,6 +200,7 @@
 
     def __init__(self,relax_instance, chunkyness=1):
         super(Mpi4py_processor,self).__init__(relax_instance = 
relax_instance, chunkyness=chunkyness)
+
 
 
         # wrap sys.exit to close down mpi before exiting
@@ -251,9 +271,12 @@
         exit_mpi()
 
     def return_object(self,result):
+
         result_object = None
         #raise Exception('dummy')
-        if self.batched_returns:
+        if isinstance(result,  Result_exception):
+            result_object=result
+        elif self.batched_returns:
             is_batch_result = isinstance(result, Batched_result_command)
 
 
@@ -266,10 +289,22 @@
             result_object=result
 
 
+
         if result_object != None:
             #FIXME check is used?
             result_object.rank=MPI.rank
             MPI.COMM_WORLD.Send(buf=result_object, dest=0)
+
+#    def queue_result_processing(self,result):
+#        # exceptions are handled instantly not queued to avoid deadlock!
+#        if isinstance(result, Result_exception):
+#            sys.exit()
+#            self.process_result(result)
+#
+#        if self.threaded_result_processing:
+#            self.result_queue.put(result)
+#        else:
+#            self.process_result(result)
 
     #FIXME: fill out generic result processing move to processor
     def process_result(self,result):
@@ -299,7 +334,8 @@
 
             if self.threaded_result_processing:
                 result_queue=Threaded_result_queue(self)
-
+            else:
+                result_queue=Immediate_result_queue(self)
 
             while len(queue) != 0:
 
@@ -315,15 +351,15 @@
 
                 while len(running_set) !=0:
                     result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
-                    #print result
+                    #if isinstance(result, Result_exception):
+                    #    print 'result', result
+                    #    sys.exit()
 
                     if result.completed:
                         idle_set.add(result.rank)
                         running_set.remove(result.rank)
-                    if self.threaded_result_processing:
-                        result_queue.put(result)
-                    else:
-                        self.process_result(result)
+
+                    result_queue.put(result)
 
             if self.threaded_result_processing:
                 result_queue.run_all()
@@ -336,6 +372,11 @@
             result = True
         return result
 
+    def print_message(self,message):
+        f=open ('error' + `self.rank()` + '.txt','a')
+        f.write(message+'\n')
+        f.flush()
+        f.close()
 
     def run(self):
 
@@ -356,8 +397,9 @@
             # note this a modified exit that kills all MPI processors
             sys.exit()
         else:
-            try:
-                while not self.do_quit:
+
+            while not self.do_quit:
+                try:
 
                     commands = MPI.COMM_WORLD.Recv(source=0)
 
@@ -380,14 +422,20 @@
 
 
                     if self.batched_returns:
-                        
self.return_object(Batched_result_command(result_commands=self.result_list))
+                        
self.return_object(Batched_result_command(processor=self,result_commands=self.result_list))
                         self.result_list=None
 
-            except Exception,e:
-                self.result_list=None
-                capturing_exception = 
Capturing_exception(rank=self.rank(),name=self.get_name())
-                exception_result = Result_exception(capturing_exception)
-                exception_result.rank=MPI.rank
-                MPI.COMM_WORLD.Send(buf=exception_result, dest=0)
+
+                except:
+                    capturing_exception = 
Capturing_exception(rank=self.rank(),name=self.get_name())
+                    exception_result = 
Result_exception(exception=capturing_exception,processor=self,completed=True)
+                    #error = 'sending exception' + `e` + e.__str__()
+                    #self.print_message(error)
+                    #result = Result_string('sending exception' + `e`, True)
+                    #exception_result.rank=MPI.rank
+                    self.return_object(exception_result)
+                    #error = 'sending exception' + `e` + e.__str__()
+                    #MPI.COMM_WORLD.Send(buf=exception_result, dest=0)
+                    self.result_list=None
 
     in_main_loop = False

Modified: branches/multi_processor/multi/processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/processor.py?rev=3275&r1=3274&r2=3275&view=diff
==============================================================================
--- branches/multi_processor/multi/processor.py (original)
+++ branches/multi_processor/multi/processor.py Sat Apr 28 19:59:04 2007
@@ -29,6 +29,21 @@
 import traceback,textwrap
 
 
+def print_file_lineno(range=xrange(1,2)):
+
+
+    for level in range:
+        print '<< ', level,
+        try:
+            file_name = sys._getframe(level).f_code.co_filename
+            function_name = sys._getframe(level).f_code.co_name
+            line_number = sys._getframe(level).f_lineno
+            msg = ': %s - %s - %d>>'  %(file_name,function_name,line_number)
+            print msg
+        except Exception, e:
+            print e
+            break
+
 
 def raise_unimplimented(method):
     raise NotImplementedError("Attempt to invoke unimplemented abstract 
method %s") % method.__name__
@@ -40,6 +55,14 @@
 
 class Processor(object):
 
+    #FIXME: remname chunk* grain*
+    def __init__(self,relax_instance,chunkyness=1):
+        self.pre_queue_command=None
+        self.post_queue_command=None
+        self.chunkyness = chunkyness
+        self.relax_instance = relax_instance
+        self.NULL_RESULT=Null_result_command(processor=self)
+
     def add_to_queue(self,command,memo=None):
          raise_unimplimented(self.add_to_queue)
 
@@ -82,10 +105,6 @@
     def abort(self):
         sys.exit()
 
-    #FIXME: remname chunk* grain*
-    def __init__(self,relax_instance,chunkyness=1):
-        self.chunkyness = chunkyness
-        self.relax_instance = relax_instance
 
     def pre_run(self):
         if self.on_master():
@@ -97,8 +116,10 @@
         if self.processor_size() > 1:
 
             pre_string = 'M'*self.rank_format_string_width()
+
             sys.stdout = PrependOut(pre_string + ' S> ', sys.stdout)
-            sys.stderr = PrependOut(pre_string + ' E> ', sys.stderr)
+            #FIXME: seems to be that writing to stderr results leeds to 
incorrect serialisation of output
+            sys.stderr = PrependOut(pre_string + ' E> ', sys.__stdout__)
 
     def get_time_delta(self,start_time,end_time):
 
@@ -129,36 +150,37 @@
 
 
 class Result(object):
-    def __init__(self,completed):
+    def __init__(self,processor,completed):
         self.completed=completed
         self.memo_id=None
-
+        self.rank = processor.rank()
 
 
 class Result_string(Result):
     #FIXME move result up a level
-    def __init__(self,string,completed):
-        super(Result_string,self).__init__(completed=completed)
+    def __init__(self,processor,string,completed):
+        
super(Result_string,self).__init__(processor=processor,completed=completed)
         self.string=string
 
 
 class Result_command(Result):
-    def __init__(self,completed,memo_id=None):
-        super(Result_command,self).__init__(completed=completed)
+    def __init__(self,processor,completed,memo_id=None):
+        
super(Result_command,self).__init__(processor=processor,completed=completed)
         self.memo_id=memo_id
+
 
     def run(self,relax,processor,memo):
         pass
 
 class Null_result_command(Result_command):
-    def __init__(self,completed=True):
-        super(Null_result_command,self).__init__(completed=completed)
-
-NULL_RESULT=Null_result_command()
+    def __init__(self,processor,completed=True):
+        
super(Null_result_command,self).__init__(processor=processor,completed=completed)
+
+
 
 class Result_exception(Result_command):
-    def __init__(self,exception,completed=True):
-        super(Result_exception,self).__init__(completed=completed)
+    def __init__(self,processor,exception,completed=True):
+        
super(Result_exception,self).__init__(processor=processor,completed=completed)
         self.exception=exception
 
     def run(self,relax,processor,memos):




Related Messages


Powered by MHonArc, Updated Sun Apr 29 23:40:05 2007