Author: varioustoxins Date: Thu Mar 29 11:45:22 2007 New Revision: 3243 URL: http://svn.gna.org/viewcvs/relax?rev=3243&view=rev Log: First fully working multi branch with both uniprocessor and mpi4py support communication overhead for 18 residues (test_short.py from chris) with in memory io ~25% Modified: branches/multi_processor/multi/mpi4py_processor.py branches/multi_processor/multi/uni_processor.py branches/multi_processor/prompt/interpreter.py branches/multi_processor/relax branches/multi_processor/specific_fns/model_free.py Modified: branches/multi_processor/multi/mpi4py_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3243&r1=3242&r2=3243&view=diff ============================================================================== --- branches/multi_processor/multi/mpi4py_processor.py (original) +++ branches/multi_processor/multi/mpi4py_processor.py Thu Mar 29 11:45:22 2007 @@ -4,6 +4,7 @@ import sys import os import math +import time,datetime #FIXME: me move top generic command module from maths_fns.mf import Mf @@ -54,7 +55,8 @@ class Result(object): def __init__(self): - self.rank=MPI.rank + pass + class Result_string(Result): #FIXME move result up a level @@ -64,11 +66,12 @@ self.completed=completed class Result_command(Result): - def __init__(self,completed): + def __init__(self,completed,memo_id=None): super(Result_command,self).__init__() self.completed=completed - - def run(self,relax,processor): + self.memo_id=memo_id + + def run(self,relax,processor,memo): pass class Null_result_command(Result_command): @@ -79,12 +82,24 @@ class Slave_command(object): + def __init__(self): + self.memo_id=None + + def set_memo_id(self,memo): + if memo != None: + self.memo_id = memo.memo_id() + else: + self.memo_id=None + def run(self,processor): pass #FIXME do some inheritance class Exit_command(Slave_command): + def __init__(self): + super(Exit_command,self).__init__() + def run(self,processor): processor.return_object(NULL_RESULT) processor.do_quit=True @@ -92,13 +107,59 @@ class Get_name_command(Slave_command): + def __init__(self): + super(Exit_command,self).__init__() + def run(self,processor): msg = processor.get_name() result = Result_string(msg,True) processor.return_object(result) +class Memo(object): + def memo_id(self): + return id(self) + + +#not quit a momento so a memo +class MF_completion_memo(Memo): + def __init__(self,model_free,index,sim_index,run,param_set,scaling): + self.index = index + self.sim_index=sim_index + self.run=run + self.param_set=param_set + self.model_free=model_free + self.scaling=scaling + + +class MF_completion_command(Result_command): + def __init__(self,memo_id,param_vector, func, iter, fc, gc, hc, warning): + super(MF_completion_command,self).__init__(True,memo_id=memo_id) + self.memo_id=memo_id + self.param_vector=param_vector + self.func=func + self.iter=iter + self.fc=fc + self.gc=gc + self.hc=hc + self.warning=warning + + def run(self,relax,processor,memo): + m_f=memo.model_free + m_f.iter_count = 0 + m_f.f_count = 0 + m_f.g_count = 0 + m_f.h_count = 0 + m_f.disassemble_result(param_vector=self.param_vector,func=self.func,iter=self.iter,fc=self.fc, + gc=self.gc,hc=self.hc, warning=self.warning, + run=memo.run, index=memo.index,sim_index=memo.sim_index, + param_set=memo.param_set,scaling=memo.scaling) + + class MF_minimise_command(Slave_command): def __init__(self): + super(MF_minimise_command,self).__init__() + + #!! 'a0':1.0,'mu':0.0001,'eta':0.1, self.minimise_map={'args':(), 'x0':None, 'min_algor':None, 'min_options':None, 'func_tol':1e-25, 'grad_tol':None, 'maxiter':1e6, 'A':None, 'b':'None', 'l':None, 'u':None, 'c':None, 'dc':None, 'd2c':None, @@ -125,12 +186,22 @@ def build_mf(self): return Mf(**self.mf_map) - def do_minimise(self): - return generic_minimise(func=self.mf.func, dfunc=self.mf.dfunc, d2func=self.mf.d2func, **self.minimise_map) - + def do_minimise(self,memo): + self.mf = self.build_mf() + results = generic_minimise(func=self.mf.func, dfunc=self.mf.dfunc, d2func=self.mf.d2func, **self.minimise_map) + + m_f=memo.model_free + param_vector, func, iter, fc, gc, hc, warning = results + m_f.disassemble_result(param_vector=param_vector,func=func,iter=iter,fc=fc, + gc=gc,hc=hc, warning=warning, + run=memo.run, index=memo.index,sim_index=memo.sim_index, + param_set=memo.param_set,scaling=memo.scaling) def run(self,processor): self.mf = self.build_mf() - self.results = generic_minimise(func=self.mf.func, dfunc=self.mf.dfunc, d2func=self.mf.d2func, **self.minimise_map) + results = generic_minimise(func=self.mf.func, dfunc=self.mf.dfunc, d2func=self.mf.d2func, **self.minimise_map) + param_vector, func, iter, fc, gc, hc, warning = results + + processor.return_object(MF_completion_command(self.memo_id,param_vector, func, iter, fc, gc, hc, warning)) #FIXME do some inheritance class Mpi4py_processor: @@ -143,6 +214,23 @@ # wrap sys.exit to close down mpi before exiting sys.exit= exit self.do_quit=False + + #FIXME un clone from uniprocessor + #command queue and memo queue + self.command_queue=[] + self.memo_map={} + + def add_to_queue(self,command,memo=None): + self.command_queue.append(command) + if memo != None: + command.set_memo_id(memo) + self.memo_map[memo.memo_id()]=memo + + def run_queue(self): + #FIXME: need a finally here to cleanup exceptions states + self.run_command_queue(self.command_queue) + del self.command_queue[:] + self.memo_map.clear() def assert_on_master(self): if MPI.rank != 0: @@ -157,13 +245,15 @@ exit_mpi() def return_object(self,result): + result.rank=MPI.rank MPI.COMM_WORLD.Send(buf=result, dest=0) - def run_command_queue(self,commands): - self.assert_on_master() - - for i in range(1,MPI.size): - MPI.COMM_WORLD.Send(buf=command,dest=i) + +# def process_commands(self,commands): +# self.assert_on_master() +# +# for i in range(1,MPI.size): +# MPI.COMM_WORLD.Send(buf=command,dest=i) def run_command_globally(self,command): queue = [command for i in range(1,MPI.size)] @@ -201,6 +291,8 @@ while len(running_set) !=0: result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) if isinstance(result, Exception): + #FIXME: clear command queue + # and finalise mpi (or restart it if we can! raise result if isinstance(result, Result): @@ -209,7 +301,13 @@ running_set.remove(result.rank) if isinstance(result, Result_command): - result.run(self.relax,self) + memo=None + if result.memo_id != None: + memo=self.memo_map[result.memo_id] + result.run(self.relax_instance,self,memo) + if result.memo_id != None and result.completed: + del self.memo_map[result.memo_id] + elif isinstance(result, Result_string): #FIXME can't cope with multiple lines print result.rank,result.string @@ -242,7 +340,14 @@ if MPI.rank ==0: + start_time = time.time() self.relax_instance.run() + end_time = time.time() + time_diff= end_time - start_time + time_delta = datetime.timedelta(seconds=time_diff) + sys.stderr.write('overall runtime: ' + time_delta.__str__() + '\n') + sys.stderr.flush() + # note this a mdofied exit that kills all MPI processors sys.exit() else: #self.relax_instance.run(deamon=True) Modified: branches/multi_processor/multi/uni_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/uni_processor.py?rev=3243&r1=3242&r2=3243&view=diff ============================================================================== --- branches/multi_processor/multi/uni_processor.py (original) +++ branches/multi_processor/multi/uni_processor.py Thu Mar 29 11:45:22 2007 @@ -1,14 +1,79 @@ #!/usr/bin/env python +import threading, Queue +import sys +import multi +import time,datetime +#FIXME need to subclass class Uni_processor(object): - def __init__(self,relax_instance): - self.relax_instance= relax_instance + def __init__(self,relax_instance): + self.relax_instance= relax_instance - def run(self): - self.relax_instance.run() + self.command_queue=[] + self.memo_map={} + + + + def add_to_queue(self,command,memo=None): + self.command_queue.append(command) + if memo != None: + command.set_memo_id(memo) + self.memo_map[memo.memo_id()]=memo + + def run_queue(self): + #FIXME: need a finally here to cleanup exceptions states + for command in self.command_queue: + print command + + + self.run_command_queue() + #TODO: add cheques for empty queuese and maps if now warn + del self.command_queue[:] + self.memo_map.clear() + + def run_command_queue(self): + for command in self.command_queue: + command.run(self) + + def run(self): + start_time = time.clock() + self.relax_instance.run() + end_time = time.clock() + time_diff= end_time - start_time + time_delta = datetime.timedelta(seconds=time_diff) + sys.stderr.write('overall runtime: ' + time_delta.__str__() + '\n') + sys.stderr.flush() + + + + + def return_object(self,result): + if isinstance(result, Exception): + #FIXME: clear command queue + # and finalise mpi (or restart it if we can! + raise result + + + + if isinstance(result, multi.mpi4py_processor.Result_command): + memo=None + if result.memo_id != None: + memo=self.memo_map[result.memo_id] + result.run(self.relax_instance,self,memo) + if result.memo_id != None and result.completed: + del self.memo_map[result.memo_id] + + elif isinstance(result, multi.mpi4py_processor.Result_string): + #FIXME can't cope with multiple lines + print result.rank,result.string + else: + message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__,result) + raise Exception(message) + if __name__ == '__main__': test =Uni_processor(None) print test + Modified: branches/multi_processor/prompt/interpreter.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/prompt/interpreter.py?rev=3243&r1=3242&r2=3243&view=diff ============================================================================== --- branches/multi_processor/prompt/interpreter.py (original) +++ branches/multi_processor/prompt/interpreter.py Thu Mar 29 11:45:22 2007 @@ -357,8 +357,9 @@ sys.stdout.write("\n") # Quit. - if quit: - sys.exit() + # FIXME: need to drop off end of interpreter loop to exit cleanly + #if quit: + # sys.exit() def prompt(intro=None, local=None): Modified: branches/multi_processor/relax URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/relax?rev=3243&r1=3242&r2=3243&view=diff ============================================================================== --- branches/multi_processor/relax (original) +++ branches/multi_processor/relax Thu Mar 29 11:45:22 2007 @@ -173,7 +173,6 @@ # Run the interpreter. self.interpreter = Interpreter(self) self.interpreter.run() - elif mode == 'slave': self.interpreter = Interpreter(self) Modified: branches/multi_processor/specific_fns/model_free.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/specific_fns/model_free.py?rev=3243&r1=3242&r2=3243&view=diff ============================================================================== --- branches/multi_processor/specific_fns/model_free.py (original) +++ branches/multi_processor/specific_fns/model_free.py Thu Mar 29 11:45:22 2007 @@ -35,7 +35,7 @@ from maths_fns.mf import Mf from minimise.generic import generic_minimise from float import isNaN,isInf -from multi.mpi4py_processor import MF_minimise_command +from multi.mpi4py_processor import MF_minimise_command,MF_completion_memo class Model_free(Common_functions): @@ -2181,6 +2181,7 @@ print "Unconstrained grid search size: " + `self.grid_size` + " (constraints may decrease this size).\n" # Initialise the iteration counter and function, gradient, and Hessian call counters. + #FIXME: move to processor command self.iter_count = 0 self.f_count = 0 self.g_count = 0 @@ -2395,6 +2396,7 @@ # ri_labels=ri_labels, gx=self.relax.data.gx, gh=self.relax.data.gh, # g_ratio=self.relax.data.g_ratio, h_bar=self.relax.data.h_bar, # mu0=self.relax.data.mu0, num_params=num_params, vectors=xh_unit_vectors) + command=MF_minimise_command() command.set_mf(init_params=self.param_vector, param_set=self.param_set, diff_type=diff_type, diff_params=diff_params, scaling_matrix=self.scaling_matrix, num_res=num_res, @@ -2460,13 +2462,40 @@ command.set_minimise(args=(), x0=self.param_vector, min_algor=min_algor, min_options=min_options, func_tol=func_tol, grad_tol=grad_tol, maxiter=max_iterations, full_output=1, print_flag=print_flag) - command.run(None) - - self.param_vector, self.func, iter, fc, gc, hc, self.warning = command.results + + memo = MF_completion_memo(model_free=self,index=index,sim_index=sim_index,run=self.run,param_set=self.param_set,scaling=scaling) + + self.relax.processor.add_to_queue(command,memo) + #self.relax.processor.add_to_queue() + + #command.do_minimise(memo) + #command.memo_id + + #param_vector, func, iter, fc, gc, hc, warning = command.results + #self.disassemble_result(param_vector=param_vector,func=func,iter=iter,fc=fc,gc=gc,hc=hc,warning=warning, + # run=memo.run,index=memo.index,sim_index=memo.sim_index, param_set=memo.param_set,scaling=memo.scaling) + + self.relax.processor.run_queue() + + + def disassemble_result(self,param_vector,func,iter,fc,gc,hc,warning,run,index,sim_index, param_set,scaling): + self.func=func + self.warning=warning + self.param_vector=param_vector + + #FIXME something is resetting the count between each calculation! +# self.iter_count = iter +# self.f_count = fc +# self.g_count = gc +# self.h_count = hc + self.iter_count = self.iter_count + iter self.f_count = self.f_count + fc self.g_count = self.g_count + gc self.h_count = self.h_count + hc + + + # Catch infinite chi-squared values. if isInf(self.func): @@ -2488,22 +2517,22 @@ # Sequence specific minimisation statistics. if self.param_set == 'mf' or self.param_set == 'local_tm': # Chi-squared statistic. - self.relax.data.res[self.run][i].chi2_sim[sim_index] = self.func + self.relax.data.res[self.run][index].chi2_sim[sim_index] = self.func # Iterations. - self.relax.data.res[self.run][i].iter_sim[sim_index] = self.iter_count + self.relax.data.res[self.run][index].iter_sim[sim_index] = self.iter_count # Function evaluations. - self.relax.data.res[self.run][i].f_count_sim[sim_index] = self.f_count + self.relax.data.res[self.run][index].f_count_sim[sim_index] = self.f_count # Gradient evaluations. - self.relax.data.res[self.run][i].g_count_sim[sim_index] = self.g_count + self.relax.data.res[self.run][index].g_count_sim[sim_index] = self.g_count # Hessian evaluations. - self.relax.data.res[self.run][i].h_count_sim[sim_index] = self.h_count + self.relax.data.res[self.run][index].h_count_sim[sim_index] = self.h_count # Warning. - self.relax.data.res[self.run][i].warning_sim[sim_index] = self.warning + self.relax.data.res[self.run][index].warning_sim[sim_index] = self.warning # Global minimisation statistics. elif self.param_set == 'diff' or self.param_set == 'all': @@ -2530,22 +2559,22 @@ # Sequence specific minimisation statistics. if self.param_set == 'mf' or self.param_set == 'local_tm': # Chi-squared statistic. - self.relax.data.res[self.run][i].chi2 = self.func + self.relax.data.res[self.run][index].chi2 = self.func # Iterations. - self.relax.data.res[self.run][i].iter = self.iter_count + self.relax.data.res[self.run][index].iter = self.iter_count # Function evaluations. - self.relax.data.res[self.run][i].f_count = self.f_count + self.relax.data.res[self.run][index].f_count = self.f_count # Gradient evaluations. - self.relax.data.res[self.run][i].g_count = self.g_count + self.relax.data.res[self.run][index].g_count = self.g_count # Hessian evaluations. - self.relax.data.res[self.run][i].h_count = self.h_count + self.relax.data.res[self.run][index].h_count = self.h_count # Warning. - self.relax.data.res[self.run][i].warning = self.warning + self.relax.data.res[self.run][index].warning = self.warning # Global minimisation statistics. elif self.param_set == 'diff' or self.param_set == 'all':