mailr3243 - in /branches/multi_processor: ./ multi/ prompt/ specific_fns/


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by garyt on March 29, 2007 - 12:07:
Author: varioustoxins
Date: Thu Mar 29 11:45:22 2007
New Revision: 3243

URL: http://svn.gna.org/viewcvs/relax?rev=3243&view=rev
Log:
First fully working multi branch with both uniprocessor and mpi4py support
communication overhead for 18 residues (test_short.py from chris) with 
in memory io ~25% 

Modified:
    branches/multi_processor/multi/mpi4py_processor.py
    branches/multi_processor/multi/uni_processor.py
    branches/multi_processor/prompt/interpreter.py
    branches/multi_processor/relax
    branches/multi_processor/specific_fns/model_free.py

Modified: branches/multi_processor/multi/mpi4py_processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3243&r1=3242&r2=3243&view=diff
==============================================================================
--- branches/multi_processor/multi/mpi4py_processor.py (original)
+++ branches/multi_processor/multi/mpi4py_processor.py Thu Mar 29 11:45:22 
2007
@@ -4,6 +4,7 @@
 import sys
 import os
 import math
+import time,datetime
 
 #FIXME: me move top generic command module
 from maths_fns.mf import Mf
@@ -54,7 +55,8 @@
 
 class Result(object):
     def __init__(self):
-        self.rank=MPI.rank
+        pass
+
 
 class Result_string(Result):
     #FIXME move result up a level
@@ -64,11 +66,12 @@
         self.completed=completed
 
 class Result_command(Result):
-    def __init__(self,completed):
+    def __init__(self,completed,memo_id=None):
         super(Result_command,self).__init__()
         self.completed=completed
-
-    def run(self,relax,processor):
+        self.memo_id=memo_id
+
+    def run(self,relax,processor,memo):
         pass
 
 class Null_result_command(Result_command):
@@ -79,12 +82,24 @@
 
 
 class Slave_command(object):
+    def __init__(self):
+        self.memo_id=None
+
+    def set_memo_id(self,memo):
+        if memo != None:
+            self.memo_id = memo.memo_id()
+        else:
+            self.memo_id=None
+
     def run(self,processor):
         pass
 
 #FIXME do some inheritance
 
 class Exit_command(Slave_command):
+    def __init__(self):
+        super(Exit_command,self).__init__()
+
     def run(self,processor):
         processor.return_object(NULL_RESULT)
         processor.do_quit=True
@@ -92,13 +107,59 @@
 
 
 class Get_name_command(Slave_command):
+    def __init__(self):
+        super(Exit_command,self).__init__()
+
     def run(self,processor):
         msg = processor.get_name()
         result = Result_string(msg,True)
         processor.return_object(result)
 
+class Memo(object):
+    def memo_id(self):
+        return id(self)
+
+
+#not quit a momento so a memo
+class MF_completion_memo(Memo):
+    def __init__(self,model_free,index,sim_index,run,param_set,scaling):
+        self.index = index
+        self.sim_index=sim_index
+        self.run=run
+        self.param_set=param_set
+        self.model_free=model_free
+        self.scaling=scaling
+
+
+class MF_completion_command(Result_command):
+    def __init__(self,memo_id,param_vector, func, iter, fc, gc, hc, warning):
+        super(MF_completion_command,self).__init__(True,memo_id=memo_id)
+        self.memo_id=memo_id
+        self.param_vector=param_vector
+        self.func=func
+        self.iter=iter
+        self.fc=fc
+        self.gc=gc
+        self.hc=hc
+        self.warning=warning
+
+    def run(self,relax,processor,memo):
+        m_f=memo.model_free
+        m_f.iter_count = 0
+        m_f.f_count = 0
+        m_f.g_count = 0
+        m_f.h_count = 0
+        
m_f.disassemble_result(param_vector=self.param_vector,func=self.func,iter=self.iter,fc=self.fc,
+                               gc=self.gc,hc=self.hc, warning=self.warning,
+                               run=memo.run, 
index=memo.index,sim_index=memo.sim_index,
+                               param_set=memo.param_set,scaling=memo.scaling)
+
+
 class MF_minimise_command(Slave_command):
     def __init__(self):
+        super(MF_minimise_command,self).__init__()
+
+
         #!! 'a0':1.0,'mu':0.0001,'eta':0.1,
         self.minimise_map={'args':(), 'x0':None, 'min_algor':None, 
'min_options':None, 'func_tol':1e-25, 'grad_tol':None,
                      'maxiter':1e6, 'A':None, 'b':'None', 'l':None, 
'u':None, 'c':None, 'dc':None, 'd2c':None,
@@ -125,12 +186,22 @@
     def build_mf(self):
         return  Mf(**self.mf_map)
 
-    def do_minimise(self):
-        return generic_minimise(func=self.mf.func, dfunc=self.mf.dfunc, 
d2func=self.mf.d2func, **self.minimise_map)
-
+    def do_minimise(self,memo):
+        self.mf = self.build_mf()
+        results = generic_minimise(func=self.mf.func, dfunc=self.mf.dfunc, 
d2func=self.mf.d2func, **self.minimise_map)
+
+        m_f=memo.model_free
+        param_vector, func, iter, fc, gc, hc, warning = results
+        
m_f.disassemble_result(param_vector=param_vector,func=func,iter=iter,fc=fc,
+                               gc=gc,hc=hc, warning=warning,
+                               run=memo.run, 
index=memo.index,sim_index=memo.sim_index,
+                               param_set=memo.param_set,scaling=memo.scaling)
     def run(self,processor):
         self.mf = self.build_mf()
-        self.results = generic_minimise(func=self.mf.func, 
dfunc=self.mf.dfunc, d2func=self.mf.d2func, **self.minimise_map)
+        results = generic_minimise(func=self.mf.func, dfunc=self.mf.dfunc, 
d2func=self.mf.d2func, **self.minimise_map)
+        param_vector, func, iter, fc, gc, hc, warning = results
+
+        
processor.return_object(MF_completion_command(self.memo_id,param_vector, 
func, iter, fc, gc, hc, warning))
 
 #FIXME do some inheritance
 class Mpi4py_processor:
@@ -143,6 +214,23 @@
         # wrap sys.exit to close down mpi before exiting
         sys.exit= exit
         self.do_quit=False
+
+        #FIXME un clone from uniprocessor
+        #command queue and memo queue
+        self.command_queue=[]
+        self.memo_map={}
+
+    def add_to_queue(self,command,memo=None):
+        self.command_queue.append(command)
+        if memo != None:
+            command.set_memo_id(memo)
+            self.memo_map[memo.memo_id()]=memo
+
+    def run_queue(self):
+        #FIXME: need a finally here to cleanup exceptions states
+         self.run_command_queue(self.command_queue)
+         del self.command_queue[:]
+         self.memo_map.clear()
 
     def assert_on_master(self):
         if MPI.rank != 0:
@@ -157,13 +245,15 @@
         exit_mpi()
 
     def return_object(self,result):
+        result.rank=MPI.rank
         MPI.COMM_WORLD.Send(buf=result, dest=0)
 
-    def run_command_queue(self,commands):
-        self.assert_on_master()
-
-        for i in range(1,MPI.size):
-            MPI.COMM_WORLD.Send(buf=command,dest=i)
+
+#    def process_commands(self,commands):
+#        self.assert_on_master()
+#
+#        for i in range(1,MPI.size):
+#            MPI.COMM_WORLD.Send(buf=command,dest=i)
 
     def run_command_globally(self,command):
         queue = [command for i in range(1,MPI.size)]
@@ -201,6 +291,8 @@
             while len(running_set) !=0:
                 result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
                 if isinstance(result, Exception):
+                    #FIXME: clear command queue
+                    #       and finalise mpi (or restart it if we can!
                     raise result
 
                 if isinstance(result, Result):
@@ -209,7 +301,13 @@
                         running_set.remove(result.rank)
 
                     if isinstance(result, Result_command):
-                        result.run(self.relax,self)
+                        memo=None
+                        if result.memo_id != None:
+                            memo=self.memo_map[result.memo_id]
+                        result.run(self.relax_instance,self,memo)
+                        if result.memo_id != None and result.completed:
+                            del self.memo_map[result.memo_id]
+
                     elif isinstance(result, Result_string):
                         #FIXME can't cope with multiple lines
                         print result.rank,result.string
@@ -242,7 +340,14 @@
 
 
         if MPI.rank ==0:
+            start_time =  time.time()
             self.relax_instance.run()
+            end_time = time.time()
+            time_diff= end_time - start_time
+            time_delta = datetime.timedelta(seconds=time_diff)
+            sys.stderr.write('overall runtime: ' + time_delta.__str__() + 
'\n')
+            sys.stderr.flush()
+            # note this a mdofied exit that kills all MPI processors
             sys.exit()
         else:
             #self.relax_instance.run(deamon=True)

Modified: branches/multi_processor/multi/uni_processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/uni_processor.py?rev=3243&r1=3242&r2=3243&view=diff
==============================================================================
--- branches/multi_processor/multi/uni_processor.py (original)
+++ branches/multi_processor/multi/uni_processor.py Thu Mar 29 11:45:22 2007
@@ -1,14 +1,79 @@
 #!/usr/bin/env python
 
+import threading, Queue
+import sys
+import multi
+import time,datetime
 
+#FIXME need to subclass
 class Uni_processor(object):
-       def __init__(self,relax_instance):
-               self.relax_instance= relax_instance
+    def __init__(self,relax_instance):
+        self.relax_instance= relax_instance
 
-       def run(self):
-               self.relax_instance.run()
+        self.command_queue=[]
+        self.memo_map={}
+
+
+
+    def add_to_queue(self,command,memo=None):
+        self.command_queue.append(command)
+        if memo != None:
+            command.set_memo_id(memo)
+            self.memo_map[memo.memo_id()]=memo
+
+    def run_queue(self):
+        #FIXME: need a finally here to cleanup exceptions states
+        for command in self.command_queue:
+            print command
+
+
+        self.run_command_queue()
+        #TODO: add cheques for empty queuese and maps if now warn
+        del self.command_queue[:]
+        self.memo_map.clear()
+
+    def run_command_queue(self):
+               for command in self.command_queue:
+                       command.run(self)
+
+    def run(self):
+        start_time =  time.clock()
+        self.relax_instance.run()
+        end_time = time.clock()
+        time_diff= end_time - start_time
+        time_delta = datetime.timedelta(seconds=time_diff)
+        sys.stderr.write('overall runtime: ' + time_delta.__str__() + '\n')
+        sys.stderr.flush()
+
+
+
+
+    def return_object(self,result):
+        if isinstance(result, Exception):
+                   #FIXME: clear command queue
+                   #       and finalise mpi (or restart it if we can!
+                   raise result
+
+
+
+        if isinstance(result, multi.mpi4py_processor.Result_command):
+            memo=None
+            if result.memo_id != None:
+                memo=self.memo_map[result.memo_id]
+                result.run(self.relax_instance,self,memo)
+            if result.memo_id != None and result.completed:
+                       del self.memo_map[result.memo_id]
+
+           elif isinstance(result, multi.mpi4py_processor.Result_string):
+               #FIXME can't cope with multiple lines
+               print result.rank,result.string
+           else:
+               message = 'Unexpected result type \n%s \nvalue%s' 
%(result.__class__.__name__,result)
+               raise Exception(message)
+
 
 
 if __name__ == '__main__':
     test =Uni_processor(None)
     print test
+

Modified: branches/multi_processor/prompt/interpreter.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/prompt/interpreter.py?rev=3243&r1=3242&r2=3243&view=diff
==============================================================================
--- branches/multi_processor/prompt/interpreter.py (original)
+++ branches/multi_processor/prompt/interpreter.py Thu Mar 29 11:45:22 2007
@@ -357,8 +357,9 @@
     sys.stdout.write("\n")
 
     # Quit.
-    if quit:
-        sys.exit()
+    # FIXME: need to drop off end of interpreter loop to exit cleanly
+    #if quit:
+    #    sys.exit()
 
 
 def prompt(intro=None, local=None):

Modified: branches/multi_processor/relax
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/relax?rev=3243&r1=3242&r2=3243&view=diff
==============================================================================
--- branches/multi_processor/relax (original)
+++ branches/multi_processor/relax Thu Mar 29 11:45:22 2007
@@ -173,7 +173,6 @@
             # Run the interpreter.
             self.interpreter = Interpreter(self)
             self.interpreter.run()
-
 
         elif mode == 'slave':
             self.interpreter = Interpreter(self)

Modified: branches/multi_processor/specific_fns/model_free.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor/specific_fns/model_free.py?rev=3243&r1=3242&r2=3243&view=diff
==============================================================================
--- branches/multi_processor/specific_fns/model_free.py (original)
+++ branches/multi_processor/specific_fns/model_free.py Thu Mar 29 11:45:22 
2007
@@ -35,7 +35,7 @@
 from maths_fns.mf import Mf
 from minimise.generic import generic_minimise
 from float import isNaN,isInf
-from multi.mpi4py_processor import  MF_minimise_command
+from multi.mpi4py_processor import  MF_minimise_command,MF_completion_memo
 
 
 class Model_free(Common_functions):
@@ -2181,6 +2181,7 @@
                     print "Unconstrained grid search size: " + 
`self.grid_size` + " (constraints may decrease this size).\n"
 
             # Initialise the iteration counter and function, gradient, and 
Hessian call counters.
+            #FIXME: move to processor command
             self.iter_count = 0
             self.f_count = 0
             self.g_count = 0
@@ -2395,6 +2396,7 @@
 #                         ri_labels=ri_labels, gx=self.relax.data.gx, 
gh=self.relax.data.gh,
 #                         g_ratio=self.relax.data.g_ratio, 
h_bar=self.relax.data.h_bar,
 #                         mu0=self.relax.data.mu0, num_params=num_params, 
vectors=xh_unit_vectors)
+
             command=MF_minimise_command()
             command.set_mf(init_params=self.param_vector, 
param_set=self.param_set, diff_type=diff_type,
                          diff_params=diff_params, 
scaling_matrix=self.scaling_matrix, num_res=num_res,
@@ -2460,13 +2462,40 @@
                 command.set_minimise(args=(), x0=self.param_vector, 
min_algor=min_algor, min_options=min_options,
                           func_tol=func_tol, grad_tol=grad_tol, 
maxiter=max_iterations, full_output=1,
                           print_flag=print_flag)
-            command.run(None)
-
-            self.param_vector, self.func, iter, fc, gc, hc, self.warning = 
command.results
+
+            memo = 
MF_completion_memo(model_free=self,index=index,sim_index=sim_index,run=self.run,param_set=self.param_set,scaling=scaling)
+
+            self.relax.processor.add_to_queue(command,memo)
+            #self.relax.processor.add_to_queue()
+
+            #command.do_minimise(memo)
+            #command.memo_id
+
+            #param_vector, func, iter, fc, gc, hc, warning = command.results
+            
#self.disassemble_result(param_vector=param_vector,func=func,iter=iter,fc=fc,gc=gc,hc=hc,warning=warning,
+            #                        
run=memo.run,index=memo.index,sim_index=memo.sim_index, 
param_set=memo.param_set,scaling=memo.scaling)
+
+        self.relax.processor.run_queue()
+
+
+    def 
disassemble_result(self,param_vector,func,iter,fc,gc,hc,warning,run,index,sim_index,
 param_set,scaling):
+            self.func=func
+            self.warning=warning
+            self.param_vector=param_vector
+
+            #FIXME something is resetting the count between each calculation!
+#            self.iter_count = iter
+#            self.f_count = fc
+#            self.g_count = gc
+#            self.h_count = hc
+
             self.iter_count = self.iter_count + iter
             self.f_count = self.f_count + fc
             self.g_count = self.g_count + gc
             self.h_count = self.h_count + hc
+
+
+
 
             # Catch infinite chi-squared values.
             if isInf(self.func):
@@ -2488,22 +2517,22 @@
                 # Sequence specific minimisation statistics.
                 if self.param_set == 'mf' or self.param_set == 'local_tm':
                     # Chi-squared statistic.
-                    self.relax.data.res[self.run][i].chi2_sim[sim_index] = 
self.func
+                    self.relax.data.res[self.run][index].chi2_sim[sim_index] 
= self.func
 
                     # Iterations.
-                    self.relax.data.res[self.run][i].iter_sim[sim_index] = 
self.iter_count
+                    self.relax.data.res[self.run][index].iter_sim[sim_index] 
= self.iter_count
 
                     # Function evaluations.
-                    self.relax.data.res[self.run][i].f_count_sim[sim_index] 
= self.f_count
+                    
self.relax.data.res[self.run][index].f_count_sim[sim_index] = self.f_count
 
                     # Gradient evaluations.
-                    self.relax.data.res[self.run][i].g_count_sim[sim_index] 
= self.g_count
+                    
self.relax.data.res[self.run][index].g_count_sim[sim_index] = self.g_count
 
                     # Hessian evaluations.
-                    self.relax.data.res[self.run][i].h_count_sim[sim_index] 
= self.h_count
+                    
self.relax.data.res[self.run][index].h_count_sim[sim_index] = self.h_count
 
                     # Warning.
-                    self.relax.data.res[self.run][i].warning_sim[sim_index] 
= self.warning
+                    
self.relax.data.res[self.run][index].warning_sim[sim_index] = self.warning
 
                 # Global minimisation statistics.
                 elif self.param_set == 'diff' or self.param_set == 'all':
@@ -2530,22 +2559,22 @@
                 # Sequence specific minimisation statistics.
                 if self.param_set == 'mf' or self.param_set == 'local_tm':
                     # Chi-squared statistic.
-                    self.relax.data.res[self.run][i].chi2 = self.func
+                    self.relax.data.res[self.run][index].chi2 = self.func
 
                     # Iterations.
-                    self.relax.data.res[self.run][i].iter = self.iter_count
+                    self.relax.data.res[self.run][index].iter = 
self.iter_count
 
                     # Function evaluations.
-                    self.relax.data.res[self.run][i].f_count = self.f_count
+                    self.relax.data.res[self.run][index].f_count = 
self.f_count
 
                     # Gradient evaluations.
-                    self.relax.data.res[self.run][i].g_count = self.g_count
+                    self.relax.data.res[self.run][index].g_count = 
self.g_count
 
                     # Hessian evaluations.
-                    self.relax.data.res[self.run][i].h_count = self.h_count
+                    self.relax.data.res[self.run][index].h_count = 
self.h_count
 
                     # Warning.
-                    self.relax.data.res[self.run][i].warning = self.warning
+                    self.relax.data.res[self.run][index].warning = 
self.warning
 
                 # Global minimisation statistics.
                 elif self.param_set == 'diff' or self.param_set == 'all':




Related Messages


Powered by MHonArc, Updated Sun Apr 01 00:00:28 2007