Author: bugman Date: Thu Aug 25 10:38:14 2011 New Revision: 14425 URL: http://svn.gna.org/viewcvs/relax?rev=14425&view=rev Log: Removed the multi_processor module as this is now called multi_processor_base. This was deleted a long time ago, but somehow survived in one of the branches as was merged back in! Removed: 1.3/multi/multi_processor.py Modified: 1.3/multi/__init__.py Modified: 1.3/multi/__init__.py URL: http://svn.gna.org/viewcvs/relax/1.3/multi/__init__.py?rev=14425&r1=14424&r2=14425&view=diff ============================================================================== --- 1.3/multi/__init__.py (original) +++ 1.3/multi/__init__.py Thu Aug 25 10:38:14 2011 @@ -24,7 +24,6 @@ __all__ = ['commands', 'mpi4py_processor', - 'multi_processor', 'multi_processor_base', 'processor_io', 'processor', Removed: 1.3/multi/multi_processor.py URL: http://svn.gna.org/viewcvs/relax/1.3/multi/multi_processor.py?rev=14424&view=auto ============================================================================== --- 1.3/multi/multi_processor.py (original) +++ 1.3/multi/multi_processor.py (removed) @@ -1,415 +1,0 @@ -############################################################################### -# # -# Copyright (C) 2007 Gary S Thompson (see https://gna.org/users/varioustoxins # -# for contact details) # -# # -# # -# This file is part of the program relax. # -# # -# relax is free software; you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation; either version 2 of the License, or # -# (at your option) any later version. # -# # -# relax is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with relax; if not, write to the Free Software # -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # -# # -################################################################################ -# rename me I look like a processor! not a base class (change to multi_processor_base?) so relax --multi multi will try to load me ;-) - -import traceback - - -import threading -import math -import sys - -from copy import copy -import Queue - -from multi.processor import Processor -from multi.processor import Result,Result_command,Result_string,Result_exception -from multi.processor import raise_unimplimented -from multi.processor import Capturing_exception - - - - - - -class Batched_result_command(Result_command): - - def __init__(self,processor,result_commands,completed=True): - super(Batched_result_command,self).__init__(processor=processor,completed=completed) - self.result_commands=result_commands - - - def run(self,processor,batched_memo): - - processor.assert_on_master() - if batched_memo != None: - msg = "batched result commands shouldn't have memo values, memo: " + `batched_memo` - raise ValueError(msg) - - for result_command in self.result_commands: - processor.process_result(result_command) - - -class Exit_queue_result_command(Result_command): - def __init__(self,completed=True): - pass - -RESULT_QUEUE_EXIT_COMMAND = Exit_queue_result_command() -#FIXME: move up a level or more -class Result_queue(object): - def __init__(self,processor): - self.processor = processor - - def put(self,job): - if isinstance(job, Result_exception) : - self.processor.process_result(job) - - def run_all(self): - raise_unimplimented(self.run_all) - -#FIXME: move up a level or more -class Threaded_result_queue(Result_queue): - def __init__(self,processor): - super(Threaded_result_queue,self).__init__(processor) - self.queue = Queue.Queue() - self.sleep_time =0.05 - self.processor=processor - self.running=1 - # FIXME: syntax error here produces exception but no quit - self.thread1 = threading.Thread(target=self.workerThread) - self.thread1.setDaemon(1) - self.thread1.start() - - def workerThread(self): - - try: - while True: - job=self.queue.get() - if job == RESULT_QUEUE_EXIT_COMMAND: - break - self.processor.process_result(job) - except: - traceback.print_exc(file=sys.stdout) - # FIXME: this doesn't work because this isn't the main thread so sys.exit fails... - self.processor.abort() - - - def put(self,job): - super(Threaded_result_queue,self).put(job) - self.queue.put_nowait(job) - - def run_all(self): - self.queue.put_nowait(RESULT_QUEUE_EXIT_COMMAND) - self.thread1.join() - -#FIXME: move up a level or more -class Immediate_result_queue(Result_queue): - def __init(self,processor): - super(Threaded_result_queue,self).__init__(processor) - - def put(self,job): - super(Immediate_result_queue,self).put(job) - try: - self.processor.process_result(job) - except: - traceback.print_exc(file=sys.stdout) - # FIXME: this doesn't work because this isn't the main thread so sys.exit fails... - self.processor.abort() - - def run_all(self): - pass - -class Too_few_slaves_exception(Exception): - def __init__(self): - msg = 'master slave processing requires at least 2 processors to run you only provided 1, exiting....' - Exception.__init__(self,msg) - - - -class Multi_processor(Processor): - - def __init__(self,processor_size,callback,stdio_capture=None): - super(Multi_processor,self).__init__(processor_size=processor_size,callback=callback,stdio_capture=stdio_capture) - - self.do_quit=False - - #FIXME un clone from uniprocessor - #command queue and memo queue - self.command_queue=[] - self.memo_map={} - - self.batched_returns=True - self.result_list=None - - self.threaded_result_processing=True - - def pre_run(self): - super(Multi_processor,self).pre_run() - - self.capture_stdio() -# self.save_stdout = sys.stdout -# self.save_stderr = sys.stderr - -# if self.processor_size() > 1 and self.rank() == 0: -# -# pre_string = 'M'*self.rank_format_string_width() -# -# sys.stdout = PrependOut(pre_string + ' S> ', sys.stdout) -# #FIXME: seems to be that writing to stderr results leeds to incorrect serialisation of output -# sys.stderr = PrependOut(pre_string + ' E> ', sys.__stdout__) -# -# else: -# # add debug flag or extra channels that output immediately -# if self.processor_size() > 1: -# pre_string = self.rank_format_string() % self.rank() -# stderr_string = ' E> ' -# stdout_string = ' S> ' -# else: -# pre_string = '' -# stderr_string = '' -# stdout_string = '' -# sys.stdout = PrependStringIO(pre_string + stdout_string) -# sys.stderr = PrependStringIO(pre_string + stderr_string,target_stream=sys.stdout) -# - - - - def post_run(self): - - self.restore_stdio() - -# if self.processor_size() > 1: -# if id(sys.stderr) != id(sys.__stderr__): -# sys.stderr.close() -# sys.stderr = sys.__stderr__ -# -# if id(sys.stdout) != id(sys.__stdout__): -# sys.stdout.close() -# sys.stdout = sys.__stdout__ - - super(Multi_processor,self).post_run() - - - #TODO: move up a level - def assert_on_master(self): - if self.on_slave(): - msg = 'running on slave when expected master with MPI.rank == 0, rank was %d'% self.rank() - raise Exception(msg) - - #FIXME: fill out generic result processing move to processor - def process_result(self,result): - - - - if isinstance(result, Result): - - if isinstance(result, Result_command): - memo=None - if result.memo_id != None: - memo=self.memo_map[result.memo_id] - result.run(self,memo) - if result.memo_id != None and result.completed: - del self.memo_map[result.memo_id] - - elif isinstance(result, Result_string): - #FIXME can't cope with multiple lines - sys.__stdout__.write(result.string), - else: - message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__,result) - raise Exception(message) - - #TODO: move up a level - def add_to_queue(self,command,memo=None): - self.command_queue.append(command) - if memo != None: - command.set_memo_id(memo) - self.memo_map[memo.memo_id()]=memo - - # FIXME move to lower level - def on_master(self): - if self.rank() == 0: - return True - - # FIXME move to lower level - def on_slave(self): - return not self.on_master() - - #TODO: move up a level - def chunk_queue(self,queue): - lqueue=copy(queue) - result = [] - processors = self.processor_size() - chunks = processors * self.grainyness - chunk_size = int(math.floor(float(len(queue)) / float(chunks))) - - if chunk_size < 1: - result = queue - else: - for i in range(chunks): - result.append(lqueue[:chunk_size]) - del lqueue[:chunk_size] - for i,elem in enumerate(lqueue): - result[i].append(elem) - return result - - #TODO: move up a level - def run_queue(self): - #FIXME: need a finally here to cleanup exceptions states - lqueue = self.chunk_queue(self.command_queue) - self.run_command_queue(lqueue) - - del self.command_queue[:] - self.memo_map.clear() - - #TODO: move up a level add send and revieve virtual functions - def return_object(self,result): - result_object = None - #raise Exception('dummy') - if isinstance(result, Result_exception): - result_object=result - elif self.batched_returns: - is_batch_result = isinstance(result, Batched_result_command) - - - if is_batch_result: - result_object = result - else: - if self.result_list != None: - self.result_list.append(result) - else: - result_object=result - - - if result_object != None: - #FIXME check is used? - result_object.rank=self.rank() - self.return_result_command(result_object=result_object) - - #TODO: move up a level add virtaul send and revieve functions - def run_command_queue(self,queue): - self.assert_on_master() - - running_set=set() - idle_set=set([i for i in range(1,self.processor_size()+1)]) - - if self.threaded_result_processing: - result_queue=Threaded_result_queue(self) - else: - result_queue=Immediate_result_queue(self) - - while len(queue) != 0: - - while len(idle_set) != 0: - if len(queue) != 0: - command = queue.pop() - dest=idle_set.pop() - self.master_queue_command(command=command,dest=dest) - running_set.add(dest) - else: - break - - - while len(running_set) !=0: - result = self.master_recieve_result() - #if isinstance(result, Result_exception): - # print 'result', result - # sys.exit() - - if result.completed: - idle_set.add(result.rank) - print 'idle set', `idle_set` - print 'running_set', `running_set` - running_set.remove(result.rank) - - result_queue.put(result) - - if self.threaded_result_processing: - result_queue.run_all() - - def create_slaves(self,processor_size): - pass - - #TODO: move up a level and add virtual send and recieve - def run(self): - - - self.pre_run() - if self.on_master(): - try: - self.create_slaves(self.processor_size()) - self.callback.init_master(self) - - except Exception,e: - self.callback.handle_exception(self,e) - - - - - else: - - while not self.do_quit: - try: - - commands = self.slave_recieve_commands() - - - - if not isinstance(commands,list): - commands = [commands] - last_command = len(commands)-1 - - if self.batched_returns: - self.result_list = [] - else: - self.result_list = None - - for i,command in enumerate(commands): - - #raise Exception('dummy') - completed = (i == last_command) - command.run(self,completed) - - - - if self.batched_returns: - self.return_object(Batched_result_command(processor=self,result_commands=self.result_list)) - self.result_list=None - - - except: - - capturing_exception = Capturing_exception(rank=self.rank(),name=self.get_name()) - exception_result = Result_exception(exception=capturing_exception,processor=self,completed=True) - - self.return_object(exception_result) - self.result_list=None - - self.post_run() - if self.on_master(): - # note this a modified exit that kills all MPI processors - sys.exit() - - def return_result_command(self,result_object): - raise_unimplimented(self.slave_queue_result) - - - def master_queue_command(self,command,dest): - raise_unimplimented(self.master_queue_command) - - - def master_recieve_result(self): - raise_unimplimented(self.master_recieve_result) - - def slave_recieve_commands(self): - raise_unimplimented(self.slave_recieve_commands)