mailr3269 - in /branches/multi_processor/multi: PrependStringIO.py commands.py mpi4py_processor.py processor.py


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by garyt on April 21, 2007 - 03:18:
Author: varioustoxins
Date: Sat Apr 21 03:17:26 2007
New Revision: 3269

URL: http://svn.gna.org/viewcvs/relax?rev=3269&view=rev
Log:
batched retrun results and better sys.exit and exception handling behaviour

Modified:
    branches/multi_processor/multi/PrependStringIO.py
    branches/multi_processor/multi/commands.py
    branches/multi_processor/multi/mpi4py_processor.py
    branches/multi_processor/multi/processor.py

Modified: branches/multi_processor/multi/PrependStringIO.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/PrependStringIO.py?rev=3269&r1=3268&r2=3269&view=diff
==============================================================================
--- branches/multi_processor/multi/PrependStringIO.py (original)
+++ branches/multi_processor/multi/PrependStringIO.py Sat Apr 21 03:17:26 2007
@@ -1,7 +1,6 @@
 from  StringIO import StringIO
 import sys
 
-# these may need to be in c they cause an pprox 10% slowdown
 
 class PrependOut(StringIO):
 
@@ -21,6 +20,9 @@
         self.stream.write(string)
         #self.truncate(0)
 
+    # lost more functions needed use dict???
+    def isatty(self,*args,**kwargs):
+        return stream.isatty(*args,**kwargs)
 #    def flush(self):
 #        self.stream.write(self.getvalue().rstrip(self.token))
 #        self.truncate(0)

Modified: branches/multi_processor/multi/commands.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/commands.py?rev=3269&r1=3268&r2=3269&view=diff
==============================================================================
--- branches/multi_processor/multi/commands.py (original)
+++ branches/multi_processor/multi/commands.py Sat Apr 21 03:17:26 2007
@@ -54,6 +54,19 @@
         msg = processor.get_name()
         result = Result_string(msg,completed)
         processor.return_object(result)
+
+class Set_processor_property_command(Slave_command):
+    def __init__(self,property_map):
+        super(Set_processor_property_command,self).__init__()
+        self.property_map = property_map
+
+    def run(self,processor,completed):
+        for property,value in self.property_map.items():
+            try:
+                setattr(processor, property, value)
+                processor.return_object(NULL_RESULT)
+            except Exception, e:
+                processor.return_object(e)
 
 
 
@@ -80,6 +93,8 @@
 OFFSET_SHORT_MIN_PARAMS=0
 OFFSET_SHORT_FK=1
 OFFSET_SHORT_K=2
+
+
 
 
 
@@ -240,8 +255,8 @@
         # add debug flag or extra channels that output immediately
         if processor.processor_size() > 1:
             pre_string = processor.rank_format_string() % processor.rank()
-            stderr_string = ' E>'
-            stdout_string  = ' S>'
+            stderr_string = ' E> '
+            stdout_string  = ' S> '
         else:
             pre_string = ''
             stderr_string = ''

Modified: branches/multi_processor/multi/mpi4py_processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3269&r1=3268&r2=3269&view=diff
==============================================================================
--- branches/multi_processor/multi/mpi4py_processor.py (original)
+++ branches/multi_processor/multi/mpi4py_processor.py Sat Apr 21 03:17:26 
2007
@@ -31,11 +31,12 @@
 import traceback
 
 from multi.processor import Processor,Memo,Slave_command
-from multi.processor import Result,Result_command,Result_string
+from multi.processor import 
Result,Result_command,Result_string,Result_exception
 from multi.commands import Exit_command
 
 from copy import copy
 from multi.processor import Capturing_exception
+
 
 
 
@@ -60,8 +61,7 @@
     sys.exit()
 
 # save original sys.exit to call after wrapper
-if MPI.rank == 0:
-    _sys_exit =  sys.exit
+_sys_exit =  sys.exit
 
 #FIXME: delete me
 #def rank_format_string():
@@ -72,10 +72,16 @@
 #RANK_FORMAT_STRING = rank_format_string
 
 # wrapper sys.exit function
+# CHECKME is status ok
 def exit(status=None):
 
-    exit_mpi()
-    _sys_exit(status)
+    if MPI.rank != 0:
+        raise Exception('sys.exit unexpectedley called on slave!')
+    else:
+        #print 'here'
+        exit_mpi()
+        #MPI.COMM_WORLD.Abort(1)
+        _sys_exit(status)
 
 def broadcast_command(command):
     for i in range(1,MPI.size):
@@ -95,6 +101,24 @@
         ditch_all_results()
 
 
+class Batched_result_command(Result_command):
+
+    def __init__(self,result_commands,completed=True):
+        super(Batched_result_command,self).__init__(completed=completed)
+        self.result_commands=result_commands
+
+
+    def run(self,relax,processor,batched_memo):
+
+        processor.assert_on_master()
+        if batched_memo != None:
+            msg = "batched result commands shouldn't have memo values, memo: 
" + `batched_memo`
+            raise ValueError(msg)
+
+        for result_command in self.result_commands:
+            processor.process_result(result_command)
+
+
 
 
 
@@ -116,6 +140,12 @@
         self.command_queue=[]
         self.memo_map={}
 
+        self.batched_returns=True
+        self.result_list=None
+
+    def abort(self):
+        MPI.Finalize()
+        MPI.COMM_WORLD.Abort()
 
     def add_to_queue(self,command,memo=None):
         self.command_queue.append(command)
@@ -163,62 +193,77 @@
     def get_name(self):
         return '%s-%s' % (MPI.Get_processor_name(),os.getpid())
 
+    # CHECKME am i used
     def exit(self):
         exit_mpi()
 
     def return_object(self,result):
-        result.rank=MPI.rank
-        MPI.COMM_WORLD.Send(buf=result, dest=0)
-
-    #FIXME: fill out
-    def process_result(self):
-        pass
+        result_object = None
+        #raise Exception('dummy')
+        if self.batched_returns:
+            is_batch_result = isinstance(result, Batched_result_command)
+
+
+            if is_batch_result:
+                result_object = result
+            else:
+                if self.result_list != None:
+                    self.result_list.append(result)
+        else:
+            result_object=result
+
+
+        if result_object != None:
+            #FIXME check is used?
+            result_object.rank=MPI.rank
+            MPI.COMM_WORLD.Send(buf=result_object, dest=0)
+
+    #FIXME: fill out generic result processing move to processor
+    def process_result(self,result):
+
+        if isinstance(result, Result):
+
+            if isinstance(result, Result_command):
+                memo=None
+                if result.memo_id != None:
+                    memo=self.memo_map[result.memo_id]
+                result.run(self.relax_instance,self,memo)
+                if result.memo_id != None and result.completed:
+                    del self.memo_map[result.memo_id]
+
+            elif isinstance(result, Result_string):
+                #FIXME can't cope with multiple lines
+                self.save_stdout.write(result.string),
+        else:
+            message = 'Unexpected result type \n%s \nvalue%s' 
%(result.__class__.__name__,result)
+            raise Exception(message)
 
     def run_command_queue(self,queue):
-        self.assert_on_master()
-
-        running_set=set()
-        idle_set=set([i for i in range(1,MPI.size)])
-
-        while len(queue) != 0:
-
-            while len(idle_set) != 0:
-                if len(queue) != 0:
-                    command = queue.pop()
-                    dest=idle_set.pop()
-                    MPI.COMM_WORLD.Send(buf=command,dest=dest)
-                    running_set.add(dest)
-                else:
-                    break
-
-
-            while len(running_set) !=0:
-                result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
-                if isinstance(result, Exception):
-                    #FIXME: clear command queue
-                    #       and finalise mpi (or restart it if we can!
-                    # also tracebacks are no good
-                    raise result
-
-                if isinstance(result, Result):
+            self.assert_on_master()
+
+            running_set=set()
+            idle_set=set([i for i in range(1,MPI.size)])
+
+            while len(queue) != 0:
+
+                while len(idle_set) != 0:
+                    if len(queue) != 0:
+                        command = queue.pop()
+                        dest=idle_set.pop()
+                        MPI.COMM_WORLD.Send(buf=command,dest=dest)
+                        running_set.add(dest)
+                    else:
+                        break
+
+
+                while len(running_set) !=0:
+                    result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
+                    print result
+
                     if result.completed:
                         idle_set.add(result.rank)
                         running_set.remove(result.rank)
-
-                    if isinstance(result, Result_command):
-                        memo=None
-                        if result.memo_id != None:
-                            memo=self.memo_map[result.memo_id]
-                        result.run(self.relax_instance,self,memo)
-                        if result.memo_id != None and result.completed:
-                            del self.memo_map[result.memo_id]
-
-                    elif isinstance(result, Result_string):
-                        #FIXME can't cope with multiple lines
-                        self.save_stdout.write(result.string),
-                    else:
-                        message = 'Unexpected result type \n%s \nvalue%s' 
%(result.__class__.__name__,result)
-                        raise Exception(message)
+                    self.process_result(result)
 
 
 
@@ -234,26 +279,49 @@
 
 
         if self.on_master():
-            self.pre_run()
-            self.relax_instance.run()
-            self.post_run()
+            try:
+                self.pre_run()
+                self.relax_instance.run()
+                self.post_run()
+            except Exception,e:
+                # check me could be moved outside
+                print e
+                self.abort()
 
             # note this a modified exit that kills all MPI processors
             sys.exit()
         else:
-
-            while not self.do_quit:
-                commands = MPI.COMM_WORLD.Recv(source=0)
-
-                if not isinstance(commands,list):
-                    commands =  [commands]
-                last_command = len(commands)-1
-                for i,command  in enumerate(commands):
-                    try:
+            try:
+
+                while not self.do_quit:
+                    commands = MPI.COMM_WORLD.Recv(source=0)
+                    sys.exit()
+
+                    if not isinstance(commands,list):
+                        commands =  [commands]
+                    last_command = len(commands)-1
+
+                    if self.batched_returns:
+                        self.result_list = []
+                    else:
+                        self.result_list = None
+
+                    for i,command  in enumerate(commands):
+
+                        #raise Exception('dummy')
                         completed = (i == last_command)
                         command.run(self,completed)
-                        #raise Exception('dummy')
-                    except Exception,e:
-                        #self.return_object(e)
-                        
self.return_object(Capturing_exception(rank=self.rank(),name=self.get_name()))
-
+
+
+
+                    if self.batched_returns:
+                        
self.return_object(Batched_result_command(result_commands=self.result_list))
+                        self.result_list=None
+
+            except Exception,e:
+                self.result_list=None
+                capturing_exception = 
Capturing_exception(rank=self.rank(),name=self.get_name())
+                exception_result = Result_exception(capturing_exception)
+                exception_result.rank=MPI.rank
+                MPI.COMM_WORLD.Send(buf=exception_result, dest=0)
+

Modified: branches/multi_processor/multi/processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/processor.py?rev=3269&r1=3268&r2=3269&view=diff
==============================================================================
--- branches/multi_processor/multi/processor.py (original)
+++ branches/multi_processor/multi/processor.py Sat Apr 21 03:17:26 2007
@@ -79,6 +79,9 @@
         queue = [command for i in range(1,MPI.size)]
         self.run_command_queue(queue)
 
+    def abort(self):
+        sys.exit()
+
     #FIXME: remname chunk* grain*
     def __init__(self,relax_instance,chunkyness=1):
         self.chunkyness = chunkyness
@@ -122,9 +125,14 @@
         format = '%%%di' % digits
         return format
 
+
+
+
 class Result(object):
     def __init__(self,completed):
         self.completed=completed
+        self.memo_id=None
+
 
 
 class Result_string(Result):
@@ -147,6 +155,14 @@
         super(Null_result_command,self).__init__(completed=completed)
 
 NULL_RESULT=Null_result_command()
+
+class Result_exception(Result_command):
+    def __init__(self,exception,completed=True):
+        super(Result_exception,self).__init__(completed=completed)
+        self.exception=exception
+
+    def run(self,relax,processor,memos):
+        raise self.exception
 
 
 class Slave_command(object):




Related Messages


Powered by MHonArc, Updated Sat Apr 21 03:20:05 2007