Author: varioustoxins Date: Tue May 1 00:24:05 2007 New Revision: 3278 URL: http://svn.gna.org/viewcvs/relax?rev=3278&view=rev Log: correct use of -n command line option, work to give general processor base class and simple processor implimentations Modified: branches/multi_processor/multi/mpi4py_processor.py branches/multi_processor/multi/processor.py branches/multi_processor/multi/uni_processor.py branches/multi_processor/relax Modified: branches/multi_processor/multi/mpi4py_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/mpi4py_processor.py?rev=3278&r1=3277&r2=3278&view=diff ============================================================================== --- branches/multi_processor/multi/mpi4py_processor.py (original) +++ branches/multi_processor/multi/mpi4py_processor.py Tue May 1 00:24:05 2007 @@ -28,12 +28,10 @@ import os import math import textwrap -import traceback -import time import Queue import threading -from multi.processor import Processor,Memo,Slave_command +from multi.processor import Processor from multi.processor import Result,Result_command,Result_string,Result_exception from multi.commands import Exit_command @@ -142,6 +140,7 @@ pass RESULT_QUEUE_EXIT_COMMAND = Exit_queue_result_command() +#FIXME: move up a level or more class Result_queue(object): def __init__(self,mpi4py_processor): self.mpi4py_processor = mpi4py_processor @@ -153,6 +152,7 @@ def run_all(self): raise_unimplimented(self.run_all) +#FIXME: move up a level or more class Threaded_result_queue(Result_queue): def __init__(self,mpi4py_processor): super(Threaded_result_queue,self).__init__(mpi4py_processor) @@ -182,6 +182,7 @@ self.queue.put_nowait(RESULT_QUEUE_EXIT_COMMAND) self.thread1.join() +#FIXME: move up a level or more class Immediate_result_queue(Result_queue): def __init(self,mpi4py_processor): super(Threaded_result_queue,self).__init__(mpi4py_processor) @@ -193,13 +194,31 @@ def run_all(self): pass +class Too_few_slaves_exception(Exception): + def __init__(self): + msg = 'mpi4py requires at least 2 mpi processors to run you only provided 1, exiting....' + Exception.__init__(self,msg) + #FIXME: do some inheritance class Mpi4py_processor(Processor): - def __init__(self,callback): - super(Mpi4py_processor,self).__init__(callback=callback) + def __init__(self,processor_size,callback): + mpi_processor_size=MPI.size-1 + + if processor_size == -1: + processor_size = mpi_processor_size + + # FIXME: needs better support in relax generates stack trace + if mpi_processor_size == 0: + raise Too_few_slaves_exception() + + msg = 'warning: mpi4py_processor is using 1 masters and %d slave processors you requested %d slaves\n' + if processor_size != (mpi_processor_size): + print msg % (mpi_processor_size,processor_size) + + super(Mpi4py_processor,self).__init__(processor_size=mpi_processor_size,callback=callback) @@ -220,6 +239,7 @@ def abort(self): MPI.COMM_WORLD.Abort() + #TODO: move up a level def add_to_queue(self,command,memo=None): self.command_queue.append(command) if memo != None: @@ -229,14 +249,13 @@ def rank(self): return MPI.rank - def processor_size(self): - return MPI.size -1 - + + #TODO: MAY NEED support for widths? def get_intro_string(self): version_info = MPI.Get_version() - return '''MPI running via mpi4py with %d slave processors, mpi version = %s.%s''' % (self.processor_size(),version_info[0],version_info[1]) - - + return '''MPI running via mpi4py with %d slave processors & 1 master, mpi version = %s.%s''' % (self.processor_size(),version_info[0],version_info[1]) + + #TODO: move up a level def chunk_queue(self,queue): lqueue=copy(queue) result = [] @@ -254,6 +273,7 @@ result[i].append(elem) return result + #TODO: move up a level def run_queue(self): #FIXME: need a finally here to cleanup exceptions states lqueue = self.chunk_queue(self.command_queue) @@ -262,6 +282,7 @@ del self.command_queue[:] self.memo_map.clear() + #TODO: move up a level def assert_on_master(self): if self.on_slave(): msg = 'running on slave when expected master with MPI.rank == 0, rank was %d'% MPI.rank @@ -274,7 +295,11 @@ # CHECKME am i used def exit(self): exit_mpi() - + #TODO: move to multiprocessor level + def return_result_command(self,result_object): + MPI.COMM_WORLD.Send(buf=result_object, dest=0) + + #TODO: move up a level add send and revieve virtual functions def return_object(self,result): result_object = None @@ -298,7 +323,8 @@ if result_object != None: #FIXME check is used? result_object.rank=MPI.rank - MPI.COMM_WORLD.Send(buf=result_object, dest=0) + self.return_result_command(result_object=result_object) + # def queue_result_processing(self,result): # # exceptions are handled instantly not queued to avoid deadlock! @@ -331,6 +357,7 @@ message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__,result) raise Exception(message) + #TODO: move up a level add virtaul send and revieve functions def run_command_queue(self,queue): self.assert_on_master() @@ -348,14 +375,15 @@ if len(queue) != 0: command = queue.pop() dest=idle_set.pop() - MPI.COMM_WORLD.Send(buf=command,dest=dest) + self.master_queue_command(command=command,dest=dest) running_set.add(dest) else: break while len(running_set) !=0: - result = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) + result = self.master_recieve_result() + #if isinstance(result, Result_exception): # print 'result', result # sys.exit() @@ -368,21 +396,30 @@ if self.threaded_result_processing: result_queue.run_all() - - - + #TODO: move to multiprocessor level + def master_queue_command(self,command,dest): + MPI.COMM_WORLD.Send(buf=command,dest=dest) + #TODO: move to multiprocessor level + def master_recieve_result(self): + return MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) + #TODO: move up a level def on_master(self): result = False if MPI.rank ==0: result = True return result - + #TODO: move up a level def print_message(self,message): f=open ('error' + `self.rank()` + '.txt','a') f.write(message+'\n') f.flush() f.close() + #TODO: move to multiprocessor level + def slave_recieve_commands(self): + return MPI.COMM_WORLD.Recv(source=0) + + #TODO: move up a level and add virtual send and recieve def run(self): global in_main_loop @@ -405,7 +442,8 @@ while not self.do_quit: try: - commands = MPI.COMM_WORLD.Recv(source=0) + commands = self.slave_recieve_commands() + if not isinstance(commands,list): Modified: branches/multi_processor/multi/processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/processor.py?rev=3278&r1=3277&r2=3278&view=diff ============================================================================== --- branches/multi_processor/multi/processor.py (original) +++ branches/multi_processor/multi/processor.py Tue May 1 00:24:05 2007 @@ -70,12 +70,13 @@ class Processor(object): #FIXME: remname chunk* grain* - def __init__(self,callback): + def __init__(self,processor_size,callback): self.callback=callback self.chunkyness=1 self.pre_queue_command=None self.post_queue_command=None self.NULL_RESULT=Null_result_command(processor=self) + self._processor_size=processor_size def add_to_queue(self,command,memo=None): raise_unimplimented(self.add_to_queue) @@ -106,7 +107,7 @@ raise_unimplimented(self.rank) def processor_size(self): - raise_unimplimented(self.processor_size()) + return self._processor_size def get_intro_string(self): raise_unimplimented(self.get_intro_string) Modified: branches/multi_processor/multi/uni_processor.py URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/multi/uni_processor.py?rev=3278&r1=3277&r2=3278&view=diff ============================================================================== --- branches/multi_processor/multi/uni_processor.py (original) +++ branches/multi_processor/multi/uni_processor.py Tue May 1 00:24:05 2007 @@ -21,6 +21,7 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # # ################################################################################ +from errors import RelaxWarnings import threading, Queue import sys,os import multi @@ -71,8 +72,12 @@ class Uni_processor(Processor): - def __init__(self,callback): - super(Uni_processor,self).__init__(callback=callback) + def __init__(self,processor_size,callback): + super(Uni_processor,self).__init__(processor_size=1,callback=callback) + + if processor_size > 1: + print 'warning: uniprocessor can only support 1 processor you requested %d' % processor_size + print 'continuing...\n' self.command_queue=[] Modified: branches/multi_processor/relax URL: http://svn.gna.org/viewcvs/relax/branches/multi_processor/relax?rev=3278&r1=3277&r2=3278&view=diff ============================================================================== --- branches/multi_processor/relax (original) +++ branches/multi_processor/relax Tue May 1 00:24:05 2007 @@ -250,10 +250,14 @@ # parser.add_option('--thread', action='store_true', dest='thread', default=0, help='run relax in threading mode (this mode should not be invoked by a user)') parser.add_option('-v', '--version', action='store_true', dest='version', default=0, help='show the version number and exit') parser.add_option('-m', '--multi', action='store', type='string', dest='multiprocessor', default='uni', help='set multi processor method') - parser.add_option('-n', '--processors', action='store', type='int', dest='n_processors', default=1, help='set number of processors (may be ignored)') + parser.add_option('-n', '--processors', action='store', type='int', dest='n_processors', default=-1, help='set number of processors (may be ignored)') + + + # Parse the options. (options, args) = parser.parse_args(args) + # Debugging flag. if options.debug: @@ -564,7 +568,7 @@ #FIXME error checking for if module require not found #FIXME move module loading to processor #FIXME module loading code needs to be in a util module -def load_multiprocessor(callback): +def load_multiprocessor(callback, processor_size): processor_name = relax_instance.multiprocessor_type + '_processor' class_name= processor_name[0].upper() + processor_name[1:] @@ -579,7 +583,7 @@ else: raise RelaxError("can't load class %s from module %s" % (class_name,module_path)) - object = clazz(callback) + object = clazz(callback=callback,processor_size=processor_size) relax_instance.processor = object return object @@ -593,9 +597,9 @@ if not profile_flag: relax_instance = Relax() - mode=relax_instance.arguments(sys.argv[1:]) + relax_instance.arguments(sys.argv[1:]) callbacks = Application_callback(master=relax_instance) - multi_processor = load_multiprocessor(callbacks) + multi_processor = load_multiprocessor(callbacks, processor_size=relax_instance.n_processors) multi_processor.run() else: