mailr3271 - /branches/multi_processor/multi/mpi4py_processor.py


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by garyt on April 23, 2007 - 13:07:
Author: varioustoxins
Date: Mon Apr 23 13:06:57 2007
New Revision: 3271

URL: http://svn.gna.org/viewcvs/relax?rev=3271&view=rev
Log:
threaded results output to _TRY_ to avoid starving multiprocessor while
waiting for stdio

Modified:
    branches/multi_processor/multi/mpi4py_processor.py

Modified: branches/multi_processor/multi/mpi4py_processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3271&r1=3270&r2=3271&view=diff
==============================================================================
--- branches/multi_processor/multi/mpi4py_processor.py (original)
+++ branches/multi_processor/multi/mpi4py_processor.py Mon Apr 23 13:06:57 
2007
@@ -29,6 +29,9 @@
 import math
 import textwrap
 import traceback
+import time
+import Queue
+import threading
 
 from multi.processor import Processor,Memo,Slave_command
 from multi.processor import 
Result,Result_command,Result_string,Result_exception
@@ -39,6 +42,11 @@
 
 
 
+
+in_main_loop = False
+
+# save original sys.exit to call after wrapper
+_sys_exit =  sys.exit
 
 
 # load mpi
@@ -60,8 +68,7 @@
     sys.stderr.write('exiting...\n\n')
     sys.exit()
 
-# save original sys.exit to call after wrapper
-_sys_exit =  sys.exit
+
 
 #FIXME: delete me
 #def rank_format_string():
@@ -76,7 +83,17 @@
 def exit(status=None):
 
     if MPI.rank != 0:
-        raise Exception('sys.exit unexpectedley called on slave!')
+        if in_main_loop:
+            raise Exception('sys.exit unexpectedley called on slave!')
+        else:
+            sys.__stderr__.write('\n')
+            
sys.__stderr__.write('***********************************************\n')
+            sys.__stderr__.write('\n')
+            sys.__stderr__.write('warning sys.exit called before mpi4py main 
loop\n')
+            sys.__stderr__.write('\n')
+            
sys.__stderr__.write('***********************************************\n')
+            sys.__stderr__.write('\n')
+            MPI.COMM_WORLD.Abort()
     else:
         #print 'here'
         exit_mpi()
@@ -119,6 +136,41 @@
             processor.process_result(result_command)
 
 
+class Exit_queue_result_command(Result_command):
+    def __init__(self,completed=True):
+        pass
+
+RESULT_QUEUE_EXIT_COMMAND = Exit_queue_result_command()
+
+class Threaded_result_queue(object):
+    def __init__(self,mpi4py_processor):
+
+        self.queue = Queue.Queue()
+        self.mpi4py_processor = mpi4py_processor
+        self.sleep_time =0.05
+
+        self.running=1
+        # FIXME: syntax error here produces exception but no quit
+        self.thread1 = threading.Thread(target=self.workerThread)
+        self.thread1.setDaemon(1)
+        self.thread1.start()
+
+    def workerThread(self):
+
+            while True:
+                item=self.queue.get()
+                if item == RESULT_QUEUE_EXIT_COMMAND:
+                    break
+                self.mpi4py_processor.process_result(item)
+
+
+    def put(self,job):
+        self.queue.put_nowait(job)
+
+    def run_all(self):
+        self.queue.put_nowait(RESULT_QUEUE_EXIT_COMMAND)
+        self.thread1.join()
+
 
 
 
@@ -127,7 +179,7 @@
 
 
 
-    def __init__(self,relax_instance, chunkyness=3):
+    def __init__(self,relax_instance, chunkyness=1):
         super(Mpi4py_processor,self).__init__(relax_instance = 
relax_instance, chunkyness=chunkyness)
 
 
@@ -143,8 +195,9 @@
         self.batched_returns=True
         self.result_list=None
 
+        self.threaded_result_processing=True
+
     def abort(self):
-        MPI.Finalize()
         MPI.COMM_WORLD.Abort()
 
     def add_to_queue(self,command,memo=None):
@@ -243,6 +296,10 @@
 
             running_set=set()
             idle_set=set([i for i in range(1,MPI.size)])
+
+            if self.threaded_result_processing:
+                result_queue=Threaded_result_queue(self)
+
 
             while len(queue) != 0:
 
@@ -258,12 +315,18 @@
 
                 while len(running_set) !=0:
                     result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
-                    print result
+                    #print result
 
                     if result.completed:
                         idle_set.add(result.rank)
                         running_set.remove(result.rank)
-                    self.process_result(result)
+                    if self.threaded_result_processing:
+                        result_queue.put(result)
+                    else:
+                        self.process_result(result)
+
+            if self.threaded_result_processing:
+                result_queue.run_all()
 
 
 
@@ -276,7 +339,8 @@
 
     def run(self):
 
-
+        global in_main_loop
+        in_main_loop= True
 
         if self.on_master():
             try:
@@ -285,15 +349,16 @@
                 self.post_run()
             except Exception,e:
                 # check me could be moved outside
-                print e
+                #print e
+                traceback.print_exc(file=sys.stdout)
                 self.abort()
 
             # note this a modified exit that kills all MPI processors
             sys.exit()
         else:
             try:
-
                 while not self.do_quit:
+
                     commands = MPI.COMM_WORLD.Recv(source=0)
 
 
@@ -325,3 +390,4 @@
                 exception_result.rank=MPI.rank
                 MPI.COMM_WORLD.Send(buf=exception_result, dest=0)
 
+    in_main_loop = False




Related Messages


Powered by MHonArc, Updated Thu Apr 26 10:40:05 2007