mailr3278 - in /branches/multi_processor: multi/mpi4py_processor.py multi/processor.py multi/uni_processor.py relax


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by garyt on May 01, 2007 - 00:24:
Author: varioustoxins
Date: Tue May  1 00:24:05 2007
New Revision: 3278

URL: http://svn.gna.org/viewcvs/relax?rev=3278&view=rev
Log:
correct use of -n command line option, work to give general processor
base class and simple processor implimentations

Modified:
    branches/multi_processor/multi/mpi4py_processor.py
    branches/multi_processor/multi/processor.py
    branches/multi_processor/multi/uni_processor.py
    branches/multi_processor/relax

Modified: branches/multi_processor/multi/mpi4py_processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3278&r1=3277&r2=3278&view=diff
==============================================================================
--- branches/multi_processor/multi/mpi4py_processor.py (original)
+++ branches/multi_processor/multi/mpi4py_processor.py Tue May  1 00:24:05 
2007
@@ -28,12 +28,10 @@
 import os
 import math
 import textwrap
-import traceback
-import time
 import Queue
 import threading
 
-from multi.processor import Processor,Memo,Slave_command
+from multi.processor import Processor
 from multi.processor import 
Result,Result_command,Result_string,Result_exception
 from multi.commands import Exit_command
 
@@ -142,6 +140,7 @@
         pass
 
 RESULT_QUEUE_EXIT_COMMAND = Exit_queue_result_command()
+#FIXME: move  up a level or more
 class Result_queue(object):
     def __init__(self,mpi4py_processor):
         self.mpi4py_processor = mpi4py_processor
@@ -153,6 +152,7 @@
     def run_all(self):
         raise_unimplimented(self.run_all)
 
+#FIXME: move  up a level or more
 class Threaded_result_queue(Result_queue):
     def __init__(self,mpi4py_processor):
         super(Threaded_result_queue,self).__init__(mpi4py_processor)
@@ -182,6 +182,7 @@
         self.queue.put_nowait(RESULT_QUEUE_EXIT_COMMAND)
         self.thread1.join()
 
+#FIXME: move  up a level or more
 class Immediate_result_queue(Result_queue):
     def __init(self,mpi4py_processor):
         super(Threaded_result_queue,self).__init__(mpi4py_processor)
@@ -193,13 +194,31 @@
     def run_all(self):
         pass
 
+class Too_few_slaves_exception(Exception):
+    def __init__(self):
+        msg = 'mpi4py requires at least 2 mpi processors to run you only 
provided 1, exiting....'
+        Exception.__init__(self,msg)
+
 #FIXME: do some inheritance
 class Mpi4py_processor(Processor):
 
 
 
-    def __init__(self,callback):
-        super(Mpi4py_processor,self).__init__(callback=callback)
+    def __init__(self,processor_size,callback):
+        mpi_processor_size=MPI.size-1
+
+        if processor_size ==  -1:
+            processor_size = mpi_processor_size
+
+        # FIXME: needs better support in relax generates stack trace
+        if mpi_processor_size == 0:
+            raise Too_few_slaves_exception()
+
+        msg = 'warning: mpi4py_processor is using 1 masters and %d slave 
processors you requested %d slaves\n'
+        if processor_size != (mpi_processor_size):
+            print msg % (mpi_processor_size,processor_size)
+
+        
super(Mpi4py_processor,self).__init__(processor_size=mpi_processor_size,callback=callback)
 
 
 
@@ -220,6 +239,7 @@
     def abort(self):
         MPI.COMM_WORLD.Abort()
 
+    #TODO: move up a level
     def add_to_queue(self,command,memo=None):
         self.command_queue.append(command)
         if memo != None:
@@ -229,14 +249,13 @@
     def rank(self):
         return MPI.rank
 
-    def processor_size(self):
-        return MPI.size -1
-
+
+    #TODO: MAY NEED support for widths?
     def get_intro_string(self):
         version_info = MPI.Get_version()
-        return '''MPI running via mpi4py with %d slave processors, mpi 
version = %s.%s''' % (self.processor_size(),version_info[0],version_info[1])
-
-
+        return '''MPI running via mpi4py with %d slave processors & 1 
master, mpi version = %s.%s''' % 
(self.processor_size(),version_info[0],version_info[1])
+
+    #TODO: move up a level
     def chunk_queue(self,queue):
         lqueue=copy(queue)
         result = []
@@ -254,6 +273,7 @@
                 result[i].append(elem)
         return result
 
+    #TODO: move up a level
     def run_queue(self):
         #FIXME: need a finally here to cleanup exceptions states
          lqueue =  self.chunk_queue(self.command_queue)
@@ -262,6 +282,7 @@
          del self.command_queue[:]
          self.memo_map.clear()
 
+    #TODO: move up a level
     def assert_on_master(self):
         if self.on_slave():
             msg = 'running on slave when expected master with MPI.rank == 0, 
rank was %d'% MPI.rank
@@ -274,7 +295,11 @@
     # CHECKME am i used
     def exit(self):
         exit_mpi()
-
+    #TODO:  move to multiprocessor level
+    def return_result_command(self,result_object):
+        MPI.COMM_WORLD.Send(buf=result_object, dest=0)
+
+    #TODO: move up a level add send and revieve virtual functions
     def return_object(self,result):
 
         result_object = None
@@ -298,7 +323,8 @@
         if result_object != None:
             #FIXME check is used?
             result_object.rank=MPI.rank
-            MPI.COMM_WORLD.Send(buf=result_object, dest=0)
+            self.return_result_command(result_object=result_object)
+
 
 #    def queue_result_processing(self,result):
 #        # exceptions are handled instantly not queued to avoid deadlock!
@@ -331,6 +357,7 @@
             message = 'Unexpected result type \n%s \nvalue%s' 
%(result.__class__.__name__,result)
             raise Exception(message)
 
+    #TODO: move up a level  add virtaul send and revieve functions
     def run_command_queue(self,queue):
             self.assert_on_master()
 
@@ -348,14 +375,15 @@
                     if len(queue) != 0:
                         command = queue.pop()
                         dest=idle_set.pop()
-                        MPI.COMM_WORLD.Send(buf=command,dest=dest)
+                        self.master_queue_command(command=command,dest=dest)
                         running_set.add(dest)
                     else:
                         break
 
 
                 while len(running_set) !=0:
-                    result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
+                    result = self.master_recieve_result()
+
                     #if isinstance(result, Result_exception):
                     #    print 'result', result
                     #    sys.exit()
@@ -368,21 +396,30 @@
 
             if self.threaded_result_processing:
                 result_queue.run_all()
-
-
-
+    #TODO:  move to multiprocessor level
+    def master_queue_command(self,command,dest):
+        MPI.COMM_WORLD.Send(buf=command,dest=dest)
+    #TODO:  move to multiprocessor level
+    def master_recieve_result(self):
+        return MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
+    #TODO: move up a level
     def on_master(self):
         result = False
         if MPI.rank ==0:
             result = True
         return result
-
+    #TODO: move up a level
     def print_message(self,message):
         f=open ('error' + `self.rank()` + '.txt','a')
         f.write(message+'\n')
         f.flush()
         f.close()
 
+    #TODO:  move to multiprocessor level
+    def slave_recieve_commands(self):
+        return MPI.COMM_WORLD.Recv(source=0)
+
+    #TODO: move up a level and add virtual send and recieve
     def run(self):
 
         global in_main_loop
@@ -405,7 +442,8 @@
             while not self.do_quit:
                 try:
 
-                    commands = MPI.COMM_WORLD.Recv(source=0)
+                    commands = self.slave_recieve_commands()
+
 
 
                     if not isinstance(commands,list):

Modified: branches/multi_processor/multi/processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/processor.py?rev=3278&r1=3277&r2=3278&view=diff
==============================================================================
--- branches/multi_processor/multi/processor.py (original)
+++ branches/multi_processor/multi/processor.py Tue May  1 00:24:05 2007
@@ -70,12 +70,13 @@
 class Processor(object):
 
     #FIXME: remname chunk* grain*
-    def __init__(self,callback):
+    def __init__(self,processor_size,callback):
         self.callback=callback
         self.chunkyness=1
         self.pre_queue_command=None
         self.post_queue_command=None
         self.NULL_RESULT=Null_result_command(processor=self)
+        self._processor_size=processor_size
 
     def add_to_queue(self,command,memo=None):
          raise_unimplimented(self.add_to_queue)
@@ -106,7 +107,7 @@
         raise_unimplimented(self.rank)
 
     def processor_size(self):
-        raise_unimplimented(self.processor_size())
+        return self._processor_size
 
     def get_intro_string(self):
         raise_unimplimented(self.get_intro_string)

Modified: branches/multi_processor/multi/uni_processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/uni_processor.py?rev=3278&r1=3277&r2=3278&view=diff
==============================================================================
--- branches/multi_processor/multi/uni_processor.py (original)
+++ branches/multi_processor/multi/uni_processor.py Tue May  1 00:24:05 2007
@@ -21,6 +21,7 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  
  #
 #                                                                            
  #
 
################################################################################
+from errors import RelaxWarnings
 import threading, Queue
 import sys,os
 import multi
@@ -71,8 +72,12 @@
 class Uni_processor(Processor):
 
 
-    def __init__(self,callback):
-        super(Uni_processor,self).__init__(callback=callback)
+    def __init__(self,processor_size,callback):
+        
super(Uni_processor,self).__init__(processor_size=1,callback=callback)
+
+        if processor_size > 1:
+            print 'warning: uniprocessor can only support 1 processor you 
requested %d' % processor_size
+            print 'continuing...\n'
 
 
         self.command_queue=[]

Modified: branches/multi_processor/relax
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/relax?rev=3278&r1=3277&r2=3278&view=diff
==============================================================================
--- branches/multi_processor/relax (original)
+++ branches/multi_processor/relax Tue May  1 00:24:05 2007
@@ -250,10 +250,14 @@
 #        parser.add_option('--thread', action='store_true', dest='thread', 
default=0, help='run relax in threading mode (this mode should not be invoked 
by a user)')
         parser.add_option('-v', '--version', action='store_true', 
dest='version', default=0, help='show the version number and exit')
         parser.add_option('-m', '--multi', action='store', type='string', 
dest='multiprocessor', default='uni', help='set multi processor method')
-        parser.add_option('-n', '--processors', action='store', type='int', 
dest='n_processors', default=1, help='set number of processors (may be 
ignored)')
+        parser.add_option('-n', '--processors', action='store', type='int', 
dest='n_processors', default=-1, help='set number of processors (may be 
ignored)')
+
+
+
 
         # Parse the options.
         (options, args) = parser.parse_args(args)
+
 
         # Debugging flag.
         if options.debug:
@@ -564,7 +568,7 @@
 #FIXME error checking for if module require not found
 #FIXME move module loading to processor
 #FIXME module loading code needs to be in a util module
-def load_multiprocessor(callback):
+def load_multiprocessor(callback, processor_size):
 
     processor_name = relax_instance.multiprocessor_type + '_processor'
     class_name= processor_name[0].upper() + processor_name[1:]
@@ -579,7 +583,7 @@
     else:
         raise RelaxError("can't load class %s from module %s" % 
(class_name,module_path))
 
-    object = clazz(callback)
+    object = clazz(callback=callback,processor_size=processor_size)
     relax_instance.processor =  object
     return object
 
@@ -593,9 +597,9 @@
 
     if not profile_flag:
         relax_instance = Relax()
-        mode=relax_instance.arguments(sys.argv[1:])
+        relax_instance.arguments(sys.argv[1:])
         callbacks =  Application_callback(master=relax_instance)
-        multi_processor = load_multiprocessor(callbacks)
+        multi_processor = load_multiprocessor(callbacks, 
processor_size=relax_instance.n_processors)
         multi_processor.run()
 
     else:




Related Messages


Powered by MHonArc, Updated Wed May 02 00:40:05 2007