Author: bugman Date: Thu Oct 16 00:21:57 2008 New Revision: 7735 URL: http://svn.gna.org/viewcvs/relax?rev=7735&view=rev Log: Manually ported r3268 from the multi_processor branch. The 2 commands used were: svn merge -r3267:3268 svn+ssh://bugman@xxxxxxxxxxx/svn/relax/branches/multi_processor . svn merge -r3267:3268 svn+ssh://bugman@xxxxxxxxxxx/svn/relax/branches/multi_processor/specific_fns/model_free.py specific_fns/model_free/mf_minimise.py ..... r3268 | varioustoxins | 2007-04-17 14:39:25 +0200 (Tue, 17 Apr 2007) | 2 lines Changed paths: M /branches/multi_processor/multi/commands.py M /branches/multi_processor/multi/mpi4py_processor.py M /branches/multi_processor/multi/processor.py M /branches/multi_processor/multi/uni_processor.py M /branches/multi_processor/specific_fns/model_free.py almost feature complete mpi4py and uni processor versions of relax ..... Modified: branches/multi_processor_merge/multi/commands.py branches/multi_processor_merge/multi/mpi4py_processor.py branches/multi_processor_merge/multi/processor.py branches/multi_processor_merge/multi/uni_processor.py branches/multi_processor_merge/specific_fns/model_free/mf_minimise.py Modified: branches/multi_processor_merge/multi/commands.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/commands.py?rev=7735&r1=7734&r2=7735&view=diff ============================================================================== --- branches/multi_processor_merge/multi/commands.py (original) +++ branches/multi_processor_merge/multi/commands.py Thu Oct 16 00:21:57 2008 @@ -32,7 +32,6 @@ import minimise import sys -from mpi4py import MPI from minimise.generic import set_pre_and_post_amble as set_generic_pre_and_post_amble from minimise.grid import set_pre_and_post_amble as set_grid_pre_and_post_amble @@ -228,29 +227,27 @@ processor.return_object(MF_result_command(self.memo_id,param_vector, func, iter, fc, gc, hc, warning,completed=False)) processor.return_object(Result_string(result_string,completed=completed)) + def pre_command_feed_back(self,processor): + self.do_feedback() + + def pre_run(self,processor): #FIXME: move to processor startup + self.save_stdout = sys.stdout self.save_stderr = sys.stderr # add debug flag or extra channels that output immediately - pre_string = processor.rank_format_string() % processor.rank() - sys.stdout = PrependStringIO(pre_string + ' S> ') - sys.stderr = PrependStringIO(pre_string + ' E> ') - - def pre_command_feed_back(self,processor): - self.do_feedback() - - def post_command_feedback(self,results,processor): - pass - - def run_command(self,processor): - self.mf = self.build_mf() - return generic_minimise(func=self.mf.func, dfunc=self.mf.dfunc, d2func=self.mf.d2func, **self.minimise_map) - - - def process_result(self,processor): - self.process_results(self.results,processor,completed) + if processor.processor_size() > 1: + pre_string = processor.rank_format_string() % processor.rank() + stderr_string = ' E>' + stdout_string = ' S>' + else: + pre_string = '' + stderr_string = '' + stdout_string = '' + sys.stdout = PrependStringIO(pre_string + stdout_string) + sys.stderr = PrependStringIO(pre_string + stderr_string) def post_run(self,processor): #FIXME: move to processor startup @@ -258,6 +255,19 @@ sys.stderr.close() sys.stdout = self.save_stdout sys.stderr = self.save_stderr + + def post_command_feedback(self,results,processor): + pass + + def run_command(self,processor): + self.mf = self.build_mf() + return generic_minimise(func=self.mf.func, dfunc=self.mf.dfunc, d2func=self.mf.d2func, **self.minimise_map) + + + def process_result(self,processor): + self.process_results(self.results,processor,completed) + + def run(self,processor, completed): @@ -303,7 +313,6 @@ # A = self.minimise_map['A'] # b = self.minimise_map['b'] # - set_generic_pre_and_post_amble(False) set_grid_pre_and_post_amble(False) @@ -450,6 +459,7 @@ grid_size = sgm.grid_size + if sgm.first_time: @@ -478,8 +488,8 @@ if print_flag and results != None: if full_output: - print - print + print '' + print '' print print_prefix + "Parameter values: " + `sgm.xk` print print_prefix + "Function value: " + `sgm.fk` print print_prefix + "Iterations: " + `sgm.k` Modified: branches/multi_processor_merge/multi/mpi4py_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/mpi4py_processor.py?rev=7735&r1=7734&r2=7735&view=diff ============================================================================== --- branches/multi_processor_merge/multi/mpi4py_processor.py (original) +++ branches/multi_processor_merge/multi/mpi4py_processor.py Thu Oct 16 00:21:57 2008 @@ -22,7 +22,8 @@ # # ################################################################################ -#TODO clone communicators & resize +# TODO: clone communicators & resize +# TODO: check exceptiosn on master import sys import os import math @@ -169,8 +170,9 @@ result.rank=MPI.rank MPI.COMM_WORLD.Send(buf=result, dest=0) - - + #FIXME: fill out + def process_result(self): + pass def run_command_queue(self,queue): self.assert_on_master() @@ -248,16 +250,10 @@ last_command = len(commands)-1 for i,command in enumerate(commands): try: - completed = i == last_command + completed = (i == last_command) command.run(self,completed) #raise Exception('dummy') except Exception,e: #self.return_object(e) self.return_object(Capturing_exception(rank=self.rank(),name=self.get_name())) - - - - - - Modified: branches/multi_processor_merge/multi/processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/processor.py?rev=7735&r1=7734&r2=7735&view=diff ============================================================================== --- branches/multi_processor_merge/multi/processor.py (original) +++ branches/multi_processor_merge/multi/processor.py Thu Oct 16 00:21:57 2008 @@ -22,7 +22,8 @@ # # ################################################################################ -#FIXME better requirement of inherited commands +# FIXME better requirement of inherited commands +# TODO: check exceptiosn on master import time,datetime,math,sys from multi.PrependStringIO import PrependStringIO,PrependOut import traceback,textwrap @@ -54,6 +55,7 @@ def get_name(self): raise_unimplimented(self.get_name) + # FIXME is this used? def exit(self): raise_unimplimented(self.exit) @@ -68,6 +70,10 @@ def processor_size(self): raise_unimplimented(self.processor_size()) + + def restore_stdio(self): + sys.stderr = self.save_stderr + sys.stdout = self.save_stdout def run_command_globally(self,command): queue = [command for i in range(1,MPI.size)] @@ -84,9 +90,12 @@ self.save_stdout = sys.stdout self.save_stderr = sys.stderr - pre_string = 'M'*self.rank_format_string_width() - sys.stdout = PrependOut(pre_string + ' S> ', sys.stdout) - sys.stderr = PrependOut(pre_string + ' E> ', sys.stderr) + + if self.processor_size() > 1: + + pre_string = 'M'*self.rank_format_string_width() + sys.stdout = PrependOut(pre_string + ' S> ', sys.stdout) + sys.stderr = PrependOut(pre_string + ' E> ', sys.stderr) def get_time_delta(self,start_time,end_time): @@ -101,8 +110,9 @@ end_time = time.time() time_delta_str = self.get_time_delta(self.start_time,end_time) print 'overall runtime: ' + time_delta_str + '\n' - sys.stdout = self.save_stdout - sys.stderr = self.save_stderr + + if self.processor_size() > 1: + self.restore_stdio() def rank_format_string_width(self): return int(math.ceil(math.log10(self.processor_size()))) Modified: branches/multi_processor_merge/multi/uni_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/uni_processor.py?rev=7735&r1=7734&r2=7735&view=diff ============================================================================== --- branches/multi_processor_merge/multi/uni_processor.py (original) +++ branches/multi_processor_merge/multi/uni_processor.py Thu Oct 16 00:21:57 2008 @@ -22,7 +22,7 @@ # # ################################################################################ import threading, Queue -import sys +import sys,os import multi from multi.processor import Processor,Result_command,Result_string @@ -69,15 +69,13 @@ #FIXME need to subclass class Uni_processor(Processor): + + def __init__(self,relax_instance): self.relax_instance= relax_instance self.command_queue=[] self.memo_map={} - - - def on_master(self): - return True def add_to_queue(self,command,memo=None): self.command_queue.append(command) @@ -86,57 +84,81 @@ self.memo_map[memo.memo_id()]=memo def run_queue(self): - #FIXME: need a finally here to cleanup exceptions states + #FIXME: need a finally here to cleanup exceptions states for windows etc - - for command in self.command_queue: - command.run(self) + last_command = len(self.command_queue)-1 + for i,command in enumerate(self.command_queue): + completed = (i == last_command) + command.run(self,completed) #self.run_command_queue() #TODO: add cheques for empty queuese and maps if now warn del self.command_queue[:] self.memo_map.clear() # FIXME: remove me # def run_command_queue(self): -# for command in self.command_queue: -# command.run(self) +# for command in self.command_queue: +# command.run(self) def run(self): # start_time = time.clock() - self.pre_run() - self.relax_instance.run() - self.post_run() + try: + self.pre_run() + self.relax_instance.run() + finally: + self.post_run() # end_time = time.clock() # time_diff= end_time - start_time # time_delta = datetime.timedelta(seconds=time_diff) # print 'overall runtime: ' + time_delta.__str__() + '\n' + def get_name(self): + # FIXME may need system dependent changes + return '%s-%s' % (os.getenv('HOSTNAME'),os.getpid()) + + def exit(self): + sys.exit() + + def on_master(self): + return True + + + def rank(self): + return 1 + + def processor_size(self): + return 1 + + + def return_object(self,result): + + local_save_stdout = sys.stdout + local_save_stderr = sys.stderr + self.restore_stdio() + if isinstance(result, Exception): - #FIXME: clear command queue + #FIXME: clear command queue # and finalise mpi (or restart it if we can! - raise result + raise result + elif isinstance(result, Result_command): + memo=None + if result.memo_id != None: + memo=self.memo_map[result.memo_id] + result.run(self.relax_instance,self,memo) + if result.memo_id != None and result.completed: + del self.memo_map[result.memo_id] + + elif isinstance(result, Result_string): + self.save_stdout.write(result.string) + else: + message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__,result) + raise Exception(message) + sys.stdout = local_save_stdout + sys.stderr = local_save_stderr - if isinstance(result, Result_command): - memo=None - if result.memo_id != None: - memo=self.memo_map[result.memo_id] - result.run(self.relax_instance,self,memo) - if result.memo_id != None and result.completed: - del self.memo_map[result.memo_id] - - elif isinstance(result, Result_string): - #FIXME can't cope with multiple lines - print result.rank,result.string - else: - message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__,result) - raise Exception(message) - - - - Modified: branches/multi_processor_merge/specific_fns/model_free/mf_minimise.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/specific_fns/model_free/mf_minimise.py?rev=7735&r1=7734&r2=7735&view=diff ============================================================================== --- branches/multi_processor_merge/specific_fns/model_free/mf_minimise.py (original) +++ branches/multi_processor_merge/specific_fns/model_free/mf_minimise.py Thu Oct 16 00:21:57 2008 @@ -978,7 +978,7 @@ # Minimisation. ############### #FIXME??? strange contraints - if match('^[Gg]rid', min_algor) and self.param_set == 'all': + if match('^[Gg]rid', min_algor) and self.param_set == 'diff' : processors = self.relax.processor.processor_size() full_grid_info = Grid_info(min_options) sub_grid_list = full_grid_info.sub_divide(self.relax.processor.processor_size())