mailr7736 - in /branches/multi_processor_merge/multi: PrependStringIO.py commands.py mpi4py_processor.py processor.py


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by edward on October 16, 2008 - 00:24:
Author: bugman
Date: Thu Oct 16 00:24:12 2008
New Revision: 7736

URL: http://svn.gna.org/viewcvs/relax?rev=7736&view=rev
Log:
Manually ported r3269, r3270, and r3271 from the multi_processor branch.

The command used was:
svn merge -r3268:3271 
svn+ssh://bugman@xxxxxxxxxxx/svn/relax/branches/multi_processor .

.....
  r3271 | varioustoxins | 2007-04-23 13:06:57 +0200 (Mon, 23 Apr 2007) | 3 
lines
  Changed paths:
     M /branches/multi_processor/multi/mpi4py_processor.py

  threaded results output to _TRY_ to avoid starving multiprocessor while
  waiting for stdio

  ------------------------------------------------------------------------
  r3270 | varioustoxins | 2007-04-21 03:18:39 +0200 (Sat, 21 Apr 2007) | 2 
lines
  Changed paths:
     M /branches/multi_processor/multi/mpi4py_processor.py

  forgot to remove sys.exit used in testing

  ------------------------------------------------------------------------
  r3269 | varioustoxins | 2007-04-21 03:17:26 +0200 (Sat, 21 Apr 2007) | 2 
lines
  Changed paths:
     M /branches/multi_processor/multi/PrependStringIO.py
     M /branches/multi_processor/multi/commands.py
     M /branches/multi_processor/multi/mpi4py_processor.py
     M /branches/multi_processor/multi/processor.py

  batched retrun results and better sys.exit and exception handling behaviour
.....


Modified:
    branches/multi_processor_merge/multi/PrependStringIO.py
    branches/multi_processor_merge/multi/commands.py
    branches/multi_processor_merge/multi/mpi4py_processor.py
    branches/multi_processor_merge/multi/processor.py

Modified: branches/multi_processor_merge/multi/PrependStringIO.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/PrependStringIO.py?rev=7736&r1=7735&r2=7736&view=diff
==============================================================================
--- branches/multi_processor_merge/multi/PrependStringIO.py (original)
+++ branches/multi_processor_merge/multi/PrependStringIO.py Thu Oct 16 
00:24:12 2008
@@ -1,7 +1,6 @@
 from  StringIO import StringIO
 import sys
 
-# these may need to be in c they cause an pprox 10% slowdown
 
 class PrependOut(StringIO):
 
@@ -21,6 +20,9 @@
         self.stream.write(string)
         #self.truncate(0)
 
+    # lost more functions needed use dict???
+    def isatty(self,*args,**kwargs):
+        return stream.isatty(*args,**kwargs)
 #    def flush(self):
 #        self.stream.write(self.getvalue().rstrip(self.token))
 #        self.truncate(0)

Modified: branches/multi_processor_merge/multi/commands.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/commands.py?rev=7736&r1=7735&r2=7736&view=diff
==============================================================================
--- branches/multi_processor_merge/multi/commands.py (original)
+++ branches/multi_processor_merge/multi/commands.py Thu Oct 16 00:24:12 2008
@@ -54,6 +54,19 @@
         msg = processor.get_name()
         result = Result_string(msg,completed)
         processor.return_object(result)
+
+class Set_processor_property_command(Slave_command):
+    def __init__(self,property_map):
+        super(Set_processor_property_command,self).__init__()
+        self.property_map = property_map
+
+    def run(self,processor,completed):
+        for property,value in self.property_map.items():
+            try:
+                setattr(processor, property, value)
+                processor.return_object(NULL_RESULT)
+            except Exception, e:
+                processor.return_object(e)
 
 
 
@@ -80,6 +93,8 @@
 OFFSET_SHORT_MIN_PARAMS=0
 OFFSET_SHORT_FK=1
 OFFSET_SHORT_K=2
+
+
 
 
 
@@ -240,8 +255,8 @@
         # add debug flag or extra channels that output immediately
         if processor.processor_size() > 1:
             pre_string = processor.rank_format_string() % processor.rank()
-            stderr_string = ' E>'
-            stdout_string  = ' S>'
+            stderr_string = ' E> '
+            stdout_string  = ' S> '
         else:
             pre_string = ''
             stderr_string = ''

Modified: branches/multi_processor_merge/multi/mpi4py_processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/mpi4py_processor.py?rev=7736&r1=7735&r2=7736&view=diff
==============================================================================
--- branches/multi_processor_merge/multi/mpi4py_processor.py (original)
+++ branches/multi_processor_merge/multi/mpi4py_processor.py Thu Oct 16 
00:24:12 2008
@@ -29,15 +29,24 @@
 import math
 import textwrap
 import traceback
+import time
+import Queue
+import threading
 
 from multi.processor import Processor,Memo,Slave_command
-from multi.processor import Result,Result_command,Result_string
+from multi.processor import 
Result,Result_command,Result_string,Result_exception
 from multi.commands import Exit_command
 
 from copy import copy
 from multi.processor import Capturing_exception
 
 
+
+
+in_main_loop = False
+
+# save original sys.exit to call after wrapper
+_sys_exit =  sys.exit
 
 
 # load mpi
@@ -59,9 +68,7 @@
     sys.stderr.write('exiting...\n\n')
     sys.exit()
 
-# save original sys.exit to call after wrapper
-if MPI.rank == 0:
-    _sys_exit =  sys.exit
+
 
 #FIXME: delete me
 #def rank_format_string():
@@ -72,10 +79,26 @@
 #RANK_FORMAT_STRING = rank_format_string
 
 # wrapper sys.exit function
+# CHECKME is status ok
 def exit(status=None):
 
-    exit_mpi()
-    _sys_exit(status)
+    if MPI.rank != 0:
+        if in_main_loop:
+            raise Exception('sys.exit unexpectedley called on slave!')
+        else:
+            sys.__stderr__.write('\n')
+            
sys.__stderr__.write('***********************************************\n')
+            sys.__stderr__.write('\n')
+            sys.__stderr__.write('warning sys.exit called before mpi4py main 
loop\n')
+            sys.__stderr__.write('\n')
+            
sys.__stderr__.write('***********************************************\n')
+            sys.__stderr__.write('\n')
+            MPI.COMM_WORLD.Abort()
+    else:
+        #print 'here'
+        exit_mpi()
+        #MPI.COMM_WORLD.Abort(1)
+        _sys_exit(status)
 
 def broadcast_command(command):
     for i in range(1,MPI.size):
@@ -95,6 +118,59 @@
         ditch_all_results()
 
 
+class Batched_result_command(Result_command):
+
+    def __init__(self,result_commands,completed=True):
+        super(Batched_result_command,self).__init__(completed=completed)
+        self.result_commands=result_commands
+
+
+    def run(self,relax,processor,batched_memo):
+
+        processor.assert_on_master()
+        if batched_memo != None:
+            msg = "batched result commands shouldn't have memo values, memo: 
" + `batched_memo`
+            raise ValueError(msg)
+
+        for result_command in self.result_commands:
+            processor.process_result(result_command)
+
+
+class Exit_queue_result_command(Result_command):
+    def __init__(self,completed=True):
+        pass
+
+RESULT_QUEUE_EXIT_COMMAND = Exit_queue_result_command()
+
+class Threaded_result_queue(object):
+    def __init__(self,mpi4py_processor):
+
+        self.queue = Queue.Queue()
+        self.mpi4py_processor = mpi4py_processor
+        self.sleep_time =0.05
+
+        self.running=1
+        # FIXME: syntax error here produces exception but no quit
+        self.thread1 = threading.Thread(target=self.workerThread)
+        self.thread1.setDaemon(1)
+        self.thread1.start()
+
+    def workerThread(self):
+
+            while True:
+                item=self.queue.get()
+                if item == RESULT_QUEUE_EXIT_COMMAND:
+                    break
+                self.mpi4py_processor.process_result(item)
+
+
+    def put(self,job):
+        self.queue.put_nowait(job)
+
+    def run_all(self):
+        self.queue.put_nowait(RESULT_QUEUE_EXIT_COMMAND)
+        self.thread1.join()
+
 
 
 
@@ -103,7 +179,7 @@
 
 
 
-    def __init__(self,relax_instance, chunkyness=3):
+    def __init__(self,relax_instance, chunkyness=1):
         super(Mpi4py_processor,self).__init__(relax_instance = 
relax_instance, chunkyness=chunkyness)
 
 
@@ -116,6 +192,13 @@
         self.command_queue=[]
         self.memo_map={}
 
+        self.batched_returns=True
+        self.result_list=None
+
+        self.threaded_result_processing=True
+
+    def abort(self):
+        MPI.COMM_WORLD.Abort()
 
     def add_to_queue(self,command,memo=None):
         self.command_queue.append(command)
@@ -163,62 +246,87 @@
     def get_name(self):
         return '%s-%s' % (MPI.Get_processor_name(),os.getpid())
 
+    # CHECKME am i used
     def exit(self):
         exit_mpi()
 
     def return_object(self,result):
-        result.rank=MPI.rank
-        MPI.COMM_WORLD.Send(buf=result, dest=0)
-
-    #FIXME: fill out
-    def process_result(self):
-        pass
+        result_object = None
+        #raise Exception('dummy')
+        if self.batched_returns:
+            is_batch_result = isinstance(result, Batched_result_command)
+
+
+            if is_batch_result:
+                result_object = result
+            else:
+                if self.result_list != None:
+                    self.result_list.append(result)
+        else:
+            result_object=result
+
+
+        if result_object != None:
+            #FIXME check is used?
+            result_object.rank=MPI.rank
+            MPI.COMM_WORLD.Send(buf=result_object, dest=0)
+
+    #FIXME: fill out generic result processing move to processor
+    def process_result(self,result):
+
+        if isinstance(result, Result):
+
+            if isinstance(result, Result_command):
+                memo=None
+                if result.memo_id != None:
+                    memo=self.memo_map[result.memo_id]
+                result.run(self.relax_instance,self,memo)
+                if result.memo_id != None and result.completed:
+                    del self.memo_map[result.memo_id]
+
+            elif isinstance(result, Result_string):
+                #FIXME can't cope with multiple lines
+                self.save_stdout.write(result.string),
+        else:
+            message = 'Unexpected result type \n%s \nvalue%s' 
%(result.__class__.__name__,result)
+            raise Exception(message)
 
     def run_command_queue(self,queue):
-        self.assert_on_master()
-
-        running_set=set()
-        idle_set=set([i for i in range(1,MPI.size)])
-
-        while len(queue) != 0:
-
-            while len(idle_set) != 0:
-                if len(queue) != 0:
-                    command = queue.pop()
-                    dest=idle_set.pop()
-                    MPI.COMM_WORLD.Send(buf=command,dest=dest)
-                    running_set.add(dest)
-                else:
-                    break
-
-
-            while len(running_set) !=0:
-                result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
-                if isinstance(result, Exception):
-                    #FIXME: clear command queue
-                    #       and finalise mpi (or restart it if we can!
-                    # also tracebacks are no good
-                    raise result
-
-                if isinstance(result, Result):
+            self.assert_on_master()
+
+            running_set=set()
+            idle_set=set([i for i in range(1,MPI.size)])
+
+            if self.threaded_result_processing:
+                result_queue=Threaded_result_queue(self)
+
+
+            while len(queue) != 0:
+
+                while len(idle_set) != 0:
+                    if len(queue) != 0:
+                        command = queue.pop()
+                        dest=idle_set.pop()
+                        MPI.COMM_WORLD.Send(buf=command,dest=dest)
+                        running_set.add(dest)
+                    else:
+                        break
+
+
+                while len(running_set) !=0:
+                    result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
+                    #print result
+
                     if result.completed:
                         idle_set.add(result.rank)
                         running_set.remove(result.rank)
-
-                    if isinstance(result, Result_command):
-                        memo=None
-                        if result.memo_id != None:
-                            memo=self.memo_map[result.memo_id]
-                        result.run(self.relax_instance,self,memo)
-                        if result.memo_id != None and result.completed:
-                            del self.memo_map[result.memo_id]
-
-                    elif isinstance(result, Result_string):
-                        #FIXME can't cope with multiple lines
-                        self.save_stdout.write(result.string),
+                    if self.threaded_result_processing:
+                        result_queue.put(result)
                     else:
-                        message = 'Unexpected result type \n%s \nvalue%s' 
%(result.__class__.__name__,result)
-                        raise Exception(message)
+                        self.process_result(result)
+
+            if self.threaded_result_processing:
+                result_queue.run_all()
 
 
 
@@ -231,29 +339,55 @@
 
     def run(self):
 
-
+        global in_main_loop
+        in_main_loop= True
 
         if self.on_master():
-            self.pre_run()
-            self.relax_instance.run()
-            self.post_run()
+            try:
+                self.pre_run()
+                self.relax_instance.run()
+                self.post_run()
+            except Exception,e:
+                # check me could be moved outside
+                #print e
+                traceback.print_exc(file=sys.stdout)
+                self.abort()
 
             # note this a modified exit that kills all MPI processors
             sys.exit()
         else:
-
-            while not self.do_quit:
-                commands = MPI.COMM_WORLD.Recv(source=0)
-
-                if not isinstance(commands,list):
-                    commands =  [commands]
-                last_command = len(commands)-1
-                for i,command  in enumerate(commands):
-                    try:
+            try:
+                while not self.do_quit:
+
+                    commands = MPI.COMM_WORLD.Recv(source=0)
+
+
+                    if not isinstance(commands,list):
+                        commands =  [commands]
+                    last_command = len(commands)-1
+
+                    if self.batched_returns:
+                        self.result_list = []
+                    else:
+                        self.result_list = None
+
+                    for i,command  in enumerate(commands):
+
+                        #raise Exception('dummy')
                         completed = (i == last_command)
                         command.run(self,completed)
-                        #raise Exception('dummy')
-                    except Exception,e:
-                        #self.return_object(e)
-                        
self.return_object(Capturing_exception(rank=self.rank(),name=self.get_name()))
-
+
+
+
+                    if self.batched_returns:
+                        
self.return_object(Batched_result_command(result_commands=self.result_list))
+                        self.result_list=None
+
+            except Exception,e:
+                self.result_list=None
+                capturing_exception = 
Capturing_exception(rank=self.rank(),name=self.get_name())
+                exception_result = Result_exception(capturing_exception)
+                exception_result.rank=MPI.rank
+                MPI.COMM_WORLD.Send(buf=exception_result, dest=0)
+
+    in_main_loop = False

Modified: branches/multi_processor_merge/multi/processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/processor.py?rev=7736&r1=7735&r2=7736&view=diff
==============================================================================
--- branches/multi_processor_merge/multi/processor.py (original)
+++ branches/multi_processor_merge/multi/processor.py Thu Oct 16 00:24:12 2008
@@ -79,6 +79,9 @@
         queue = [command for i in range(1,MPI.size)]
         self.run_command_queue(queue)
 
+    def abort(self):
+        sys.exit()
+
     #FIXME: remname chunk* grain*
     def __init__(self,relax_instance,chunkyness=1):
         self.chunkyness = chunkyness
@@ -122,9 +125,14 @@
         format = '%%%di' % digits
         return format
 
+
+
+
 class Result(object):
     def __init__(self,completed):
         self.completed=completed
+        self.memo_id=None
+
 
 
 class Result_string(Result):
@@ -147,6 +155,14 @@
         super(Null_result_command,self).__init__(completed=completed)
 
 NULL_RESULT=Null_result_command()
+
+class Result_exception(Result_command):
+    def __init__(self,exception,completed=True):
+        super(Result_exception,self).__init__(completed=completed)
+        self.exception=exception
+
+    def run(self,relax,processor,memos):
+        raise self.exception
 
 
 class Slave_command(object):




Related Messages


Powered by MHonArc, Updated Thu Oct 16 00:40:03 2008