mailr7716 - in /branches/multi_processor_merge: multi/mpi4py_processor.py relax


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by edward on October 15, 2008 - 22:05:
Author: bugman
Date: Wed Oct 15 22:05:14 2008
New Revision: 7716

URL: http://svn.gna.org/viewcvs/relax?rev=7716&view=rev
Log:
Manually ported r3239 from the multi_processor branch.

The command used was:
svn merge -r3238:3239 
svn+ssh://bugman@xxxxxxxxxxx/svn/relax/branches/multi_processor .

.....
  r3239 | varioustoxins | 2007-03-22 09:20:55 +0100 (Thu, 22 Mar 2007) | 3 
lines
  Changed paths:
     M /branches/multi_processor/multi/mpi4py_processor.py
     M /branches/multi_processor/relax

  multi processor fixes for proper command queing with optional
  load balancing iexception support and segmented results
.....


Modified:
    branches/multi_processor_merge/multi/mpi4py_processor.py
    branches/multi_processor_merge/relax

Modified: branches/multi_processor_merge/multi/mpi4py_processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/mpi4py_processor.py?rev=7716&r1=7715&r2=7716&view=diff
==============================================================================
--- branches/multi_processor_merge/multi/mpi4py_processor.py (original)
+++ branches/multi_processor_merge/multi/mpi4py_processor.py Wed Oct 15 
22:05:14 2008
@@ -1,7 +1,9 @@
 #!/usr/bin/env python
 
+#TODO clone communicators & resize
 import sys
 import os
+import math
 
 
 # load mpi
@@ -15,27 +17,80 @@
 if MPI.rank == 0:
     _sys_exit =  sys.exit
 
+
+def rank_format_string():
+    digits  = math.ceil(math.log10(MPI.size))
+    format = '%%%di' % digits
+    return format
+
+RANK_FORMAT_STRING = rank_format_string
+
 # wrapper sys.exit function
 def exit(status=None):
 
     exit_mpi()
     _sys_exit(status)
 
+def broadcast_command(command):
+    for i in range(1,MPI.size):
+        if i != 0:
+            MPI.COMM_WORLD.Send(buf=command,dest=i)
+
+def ditch_all_results():
+    for i in range(1,MPI.size):
+        if i != 0:
+            while 1:
+                result = MPI.COMM_WORLD.Recv(source=i)
+                if result.completed:
+                    break
 def exit_mpi():
     if MPI.Is_initialized() and not MPI.Is_finalized() and MPI.rank == 0:
-        sendbuf = Exit_command()
-        for i in range(MPI.size):
-            if i != 0:
-                MPI.COMM_WORLD.Send(buf=sendbuf,dest=i)
+        broadcast_command(Exit_command())
+        ditch_all_results()
+
+
+class Result(object):
+    def __init__(self):
+        self.rank=MPI.rank
+
+class Result_string(Result):
+    #FIXME move result up a level
+    def __init__(self,string,completed):
+        super(Result_string,self).__init__()
+        self.string=string
+        self.completed=completed
+
+class Result_command(Result):
+    def __init__(self,completed):
+        super(Result_command,self).__init__()
+        self.completed=completed
+
+    def run(self,relax,processor):
+        pass
+
+class Null_result_command(Result_command):
+    def __init__(self):
+        super(Null_result_command,self).__init__(completed=True)
+
+NULL_RESULT=Null_result_command()
+
+class Slave_command(object):
+    def run(self,processor):
+        pass
 
 #FIXME do some inheritance
-class Exit_command(object):
-    def run(self,relax,processor):
+
+class Exit_command(Slave_command):
+    def run(self,processor):
+        processor.return_object(NULL_RESULT)
         processor.do_quit=True
 
-class Get_name_command(object):
-    def run(self,relax,processor):
-        result = '%s-%s' % (MPI.Get_processor_name(),os.getpid())
+
+
+class Get_name_command(Slave_command):
+    def run(self,processor):
+        msg = processor.get_name()
+        result = Result_string(msg,True)
         processor.return_object(result)
 
 #FIXME do some inheritance
@@ -50,25 +105,78 @@
         sys.exit= exit
         self.do_quit=False
 
+    def assert_on_master(self):
+        if MPI.rank != 0:
+            msg = 'running on slave when expected master with MPI.rank == 0, 
rank was %d'% MPI.rank
+            raise Exception(msg)
+
+
+    def get_name(self):
+        return '%s-%s' % (MPI.Get_processor_name(),os.getpid())
+
     def exit(self):
         exit_mpi()
 
     def return_object(self,result):
         MPI.COMM_WORLD.Send(buf=result, dest=0)
 
-    def run_command(self,command):
+    def run_command_queue(self,commands):
+        self.assert_on_master()
+
         for i in range(1,MPI.size):
-            if i != 0:
-                MPI.COMM_WORLD.Send(buf=command,dest=i)
-        for i in range(1,MPI.size):
-            buf=[]
-            if i !=0:
-                elem = MPI.COMM_WORLD.Recv(source=i)
-                if type(elem) == 'object':
-                    elem.run(relax_instance, relax_instance.processor)
+            MPI.COMM_WORLD.Send(buf=command,dest=i)
+
+    def run_command_globally(self,command):
+        queue = [command for i in range(1,MPI.size)]
+        self.run_command_queue(queue)
+
+    def run_command_queue(self,queue):
+        self.assert_on_master()
+
+#        for i in range(1,MPI.size):
+#                MPI.COMM_WORLD.Send(buf=command,dest=i)
+#        for i in range(1,MPI.size):
+#            elem = MPI.COMM_WORLD.Recv(source=i)
+#            if type(elem) == 'object':
+#                elem.run(relax_instance, relax_instance.processor)
+#            else:
+#                #FIXME can't cope with multiple lines
+#                print i,elem
+        #queue = [command for i in range(1,MPI.size*2)]
+
+        running_set=set()
+        idle_set=set([i for i in range(1,MPI.size)])
+
+        while len(queue) != 0:
+
+            while len(idle_set) != 0:
+                if len(queue) != 0:
+                    command = queue.pop()
+                    dest=idle_set.pop()
+                    MPI.COMM_WORLD.Send(buf=command,dest=dest)
+                    running_set.add(dest)
                 else:
-                    #FIXME can't cope with multiple lines
-                    print i,elem
+                    break
+
+
+            while len(running_set) !=0:
+                result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
+                if isinstance(result, Exception):
+                    raise result
+
+                if isinstance(result, Result):
+                    if result.completed:
+                        idle_set.add(result.rank)
+                        running_set.remove(result.rank)
+
+                    if isinstance(result, Result_command):
+                        result.run(self.relax,self)
+                    elif isinstance(result, Result_string):
+                        #FIXME can't cope with multiple lines
+                        print result.rank,result.string
+                    else:
+                        message = 'Unexpected result type \n%s \nvalue%s' 
%(result.__class__.__name__,result)
+                        raise Exception(message)
 
 
 #        for i in range(MPI.size):
@@ -84,14 +192,14 @@
 
     def run(self):
 
-        if MPI.rank == 0:
-            self.relax_instance.multi_mode='multi_master'
-        else:
-            self.relax_instance.multi_mode='multi_slave'
-            self.relax_instance.mode='slave'
-            self.relax_instance.script_file=None
-            self.relax_instance.dummy_mode=True
-            self.relax_instance.run()
+#        if MPI.rank == 0:
+#            self.relax_instance.multi_mode='multi_master'
+#        else:
+#            self.relax_instance.multi_mode='multi_slave'
+#            self.relax_instance.mode='slave'
+#            self.relax_instance.script_file=None
+#            self.relax_instance.dummy_mode=True
+#            #self.relax_instance.run()
 
 
         if MPI.rank ==0:
@@ -101,7 +209,11 @@
             #self.relax_instance.run(deamon=True)
             while not self.do_quit:
                 command = MPI.COMM_WORLD.Recv(source=0)
-                command.run(self.relax_instance, 
self.relax_instance.processor)
+                try:
+                    command.run(self)
+                except Exception,e:
+                    self.return_object(e)
+
 
 
             #if data=='close':

Modified: branches/multi_processor_merge/relax
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/relax?rev=7716&r1=7715&r2=7716&view=diff
==============================================================================
--- branches/multi_processor_merge/relax (original)
+++ branches/multi_processor_merge/relax Wed Oct 15 22:05:14 2008
@@ -107,12 +107,14 @@
 
         #FIXME use self.mode all over
         mode = self.mode
+        print mode
 
         # Show the version number and exit.
         if mode == 'version':
             print 'relax ' + self.version
             sys.exit()
 
+        # FIXME threading
         # Logging.
         if self.log_file:
             log(self.log_file)
@@ -129,7 +131,7 @@
             # Run the interpreter.
             self.interpreter = interpreter.Interpreter(self, intro_string)
             self.interpreter.run(self.script_file)
-            print 'exit'
+
 
         elif mode == 'slave':
             self.interpreter = Interpreter(self)
@@ -466,14 +468,14 @@
     module = None
     result = None
 
-    try:
-        module = __import__(module_path,globals(),  locals(), [])
-        if verbose:
-            print 'loaded module %s' % module_path
-    except Exception, e:
-        if verbose:
-            print 'failed to load module_path %s' % module_path
-            print 'exception:',e
+    #try:
+    module = __import__(module_path,globals(),  locals(), [])
+    if verbose:
+        print 'loaded module %s' % module_path
+    #except Exception, e:
+    #    if verbose:
+    #        print 'failed to load module_path %s' % module_path
+    #        print 'exception:',e
 
     #FIXME: needs more failure checking
     if module != None:
@@ -494,6 +496,7 @@
 
     modules = import_module(module_path)
     #print modules
+
     if hasattr(modules[-1],class_name):
         clazz =  getattr(modules[-1], class_name)
     else:




Related Messages


Powered by MHonArc, Updated Wed Oct 15 22:20:03 2008