mailr7843 - in /branches/multi_processor_merge/multi: multi_processor.py multi_processor_base.py


Others Months | Index by Date | Thread Index
>>   [Date Prev] [Date Next] [Thread Prev] [Thread Next]

Header


Content

Posted by edward on October 19, 2008 - 21:31:
Author: bugman
Date: Sun Oct 19 21:31:45 2008
New Revision: 7843

URL: http://svn.gna.org/viewcvs/relax?rev=7843&view=rev
Log:
Renamed the multi_processor module to multi_processor_base.

This was mentioned in the module and by Gary in the detailed post:
https://mail.gna.org/public/relax-devel/2007-05/msg00000.html.


Added:
    branches/multi_processor_merge/multi/multi_processor_base.py
      - copied, changed from r7842, 
branches/multi_processor_merge/multi/multi_processor.py
Removed:
    branches/multi_processor_merge/multi/multi_processor.py

Removed: branches/multi_processor_merge/multi/multi_processor.py
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/multi_processor.py?rev=7842&view=auto
==============================================================================
--- branches/multi_processor_merge/multi/multi_processor.py (original)
+++ branches/multi_processor_merge/multi/multi_processor.py (removed)
@@ -1,413 +1,0 @@
-###############################################################################
-#                                                                            
 #
-# Copyright (C) 2007  Gary S Thompson (https://gna.org/users/varioustoxins)  
 #
-#                                                                            
 #
-# This file is part of the program relax.                                    
 #
-#                                                                            
 #
-# relax is free software; you can redistribute it and/or modify              
 #
-# it under the terms of the GNU General Public License as published by       
 #
-# the Free Software Foundation; either version 2 of the License, or          
 #
-# (at your option) any later version.                                        
 #
-#                                                                            
 #
-# relax is distributed in the hope that it will be useful,                   
 #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of             
 #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              
 #
-# GNU General Public License for more details.                               
 #
-#                                                                            
 #
-# You should have received a copy of the GNU General Public License          
 #
-# along with relax; if not, write to the Free Software                       
 #
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  
 #
-#                                                                            
 #
-###############################################################################
-# rename me I look like a processor! not a base class (change to 
multi_processor_base?) so relax --multi multi will try to load me ;-)
-
-# Module docstring.
-"""Module containing a Processor base class to be used by any 
multi-processor methodology.
-
-This is used by the mpi4py clustering code.  It can also be used by any new 
implementation
-including, for example:
-
-    - Other implementations using different python MPI libraries (pypar, 
etc.).
-    - Use of ssh tunnels for parallel programming.
-    - Use of the twisted frame work for communication 
(http://twistedmatrix.com/projects/).
-    - The parallel virtual machine (pvm) via pypvm 
(http://pypvm.sourceforge.net).
-"""
-
-# Python module imports.
-import Queue
-from copy import copy
-import math
-import sys
-import threading
-import traceback
-
-# relax module imports.
-from multi.processor import raise_unimplemented, Capturing_exception, 
Processor, Result, Result_command, Result_string, Result_exception
-
-
-class Batched_result_command(Result_command):
-    def __init__(self, processor, result_commands, completed=True):
-        super(Batched_result_command, self).__init__(processor=processor, 
completed=completed)
-        self.result_commands = result_commands
-
-
-    def run(self, processor, batched_memo):
-        processor.assert_on_master()
-        if batched_memo != None:
-            msg = "batched result commands shouldn't have memo values, memo: 
" + `batched_memo`
-            raise ValueError(msg)
-
-        for result_command in self.result_commands:
-            processor.process_result(result_command)
-
-
-class Exit_queue_result_command(Result_command):
-    def __init__(self, completed=True):
-        pass
-
-
-RESULT_QUEUE_EXIT_COMMAND = Exit_queue_result_command()
-#FIXME: move up a level or more
-#FIXME: move up a level or more
-class Immediate_result_queue(Result_queue):
-    def __init(self, processor):
-        super(Threaded_result_queue, self).__init__(processor)
-
-
-    def put(self, job):
-        super(Immediate_result_queue, self).put(job)
-        try:
-            self.processor.process_result(job)
-        except:
-            traceback.print_exc(file=sys.stdout)
-            # FIXME: this doesn't work because this isn't the main thread so 
sys.exit fails...
-            self.processor.abort()
-
-
-    def run_all(self):
-        pass
-
-
-class Multi_processor(Processor):
-    """The multi-processor base class."""
-
-    def __init__(self, processor_size, callback, stdio_capture=None):
-        super(Multi_processor, self).__init__(processor_size=processor_size, 
callback=callback, stdio_capture=stdio_capture)
-
-        self.do_quit = False
-
-        #FIXME un clone from uniprocessor
-        #command queue and memo queue
-        self.command_queue = []
-        self.memo_map = {}
-
-        self.batched_returns = True
-        self.result_list = None
-
-        self.threaded_result_processing = True
-
-
-    #TODO: move up a level
-    def add_to_queue(self, command, memo=None):
-        self.command_queue.append(command)
-        if memo != None:
-            command.set_memo_id(memo)
-            self.memo_map[memo.memo_id()] = memo
-
-
-    #TODO: move up a level
-    def assert_on_master(self):
-        if self.on_slave():
-            msg = 'running on slave when expected master with MPI.rank == 0, 
rank was %d'% self.rank()
-            raise Exception(msg)
-
-
-    #TODO: move up a level
-    def chunk_queue(self, queue):
-        lqueue = copy(queue)
-        result = []
-        processors = self.processor_size()
-        chunks = processors * self.grainyness
-        chunk_size = int(math.floor(float(len(queue)) / float(chunks)))
-
-        if chunk_size < 1:
-            result = queue
-        else:
-            for i in range(chunks):
-                result.append(lqueue[:chunk_size])
-                del lqueue[:chunk_size]
-            for i, elem in enumerate(lqueue):
-                result[i].append(elem)
-        return result
-
-
-    def create_slaves(self, processor_size):
-        pass
-
-
-    def master_queue_command(self, command, dest):
-        raise_unimplemented(self.master_queue_command)
-
-
-    def master_recieve_result(self):
-        raise_unimplemented(self.master_recieve_result)
-
-
-    # FIXME move to lower level
-    def on_master(self):
-        if self.rank() == 0:
-            return True
-
-
-    # FIXME move to lower level
-    def on_slave(self):
-        return not self.on_master()
-
-
-    def post_run(self):
-        self.restore_stdio()
-
-#        if self.processor_size() > 1:
-#           if id(sys.stderr) != id(sys.__stderr__):
-#               sys.stderr.close()
-#               sys.stderr = sys.__stderr__
-#
-#           if id(sys.stdout) != id(sys.__stdout__):
-#               sys.stdout.close()
-#               sys.stdout = sys.__stdout__
-
-        super(Multi_processor, self).post_run()
-
-
-    def pre_run(self):
-        super(Multi_processor, self).pre_run()
-
-        self.capture_stdio()
-#        self.save_stdout = sys.stdout
-#        self.save_stderr = sys.stderr
-
-#        if self.processor_size() > 1 and self.rank() == 0:
-#
-#            pre_string = 'M'*self.rank_format_string_width()
-#
-#            sys.stdout = PrependOut(pre_string + ' S> ', sys.stdout)
-#            #FIXME: seems to be that writing to stderr results leeds to 
incorrect serialisation of output
-#            sys.stderr = PrependOut(pre_string + ' E> ', sys.__stdout__)
-#
-#        else:
-#            # add debug flag or extra channels that output immediately
-#            if self.processor_size() > 1:
-#                pre_string = self.rank_format_string() % self.rank()
-#                stderr_string = ' E> '
-#                stdout_string = ' S> '
-#            else:
-#                pre_string = ''
-#                stderr_string = ''
-#                stdout_string = ''
-#            sys.stdout = PrependStringIO(pre_string + stdout_string)
-#            sys.stderr = PrependStringIO(pre_string + stderr_string, 
target_stream=sys.stdout)
-
-
-    #FIXME: fill out generic result processing move to processor
-    def process_result(self, result):
-        if isinstance(result, Result):
-            if isinstance(result, Result_command):
-                memo = None
-                if result.memo_id != None:
-                    memo = self.memo_map[result.memo_id]
-                result.run(self, memo)
-                if result.memo_id != None and result.completed:
-                    del self.memo_map[result.memo_id]
-
-            elif isinstance(result, Result_string):
-                #FIXME can't cope with multiple lines
-                sys.__stdout__.write(result.string),
-        else:
-            message = 'Unexpected result type \n%s \nvalue%s' 
%(result.__class__.__name__, result)
-            raise Exception(message)
-
-
-    #TODO: move up a level add send and revieve virtual functions
-    def return_object(self, result):
-        result_object = None
-        #raise Exception('dummy')
-        if isinstance(result, Result_exception):
-            result_object = result
-        elif self.batched_returns:
-            is_batch_result = isinstance(result, Batched_result_command)
-
-            if is_batch_result:
-                result_object = result
-            else:
-                if self.result_list != None:
-                    self.result_list.append(result)
-        else:
-            result_object = result
-
-        if result_object != None:
-            #FIXME check is used?
-            result_object.rank = self.rank()
-            self.return_result_command(result_object=result_object)
-
-
-    def return_result_command(self, result_object):
-        raise_unimplemented(self.slave_queue_result)
-
-
-    #TODO: move up a level and add virtual send and recieve
-    def run(self):
-        self.pre_run()
-        if self.on_master():
-            try:
-                self.create_slaves(self.processor_size())
-                self.callback.init_master(self)
-
-            except Exception, e:
-                self.callback.handle_exception(self, e)
-
-        else:
-            while not self.do_quit:
-                try:
-                    commands = self.slave_recieve_commands()
-                    if not isinstance(commands, list):
-                        commands = [commands]
-                    last_command = len(commands)-1
-
-                    if self.batched_returns:
-                        self.result_list = []
-                    else:
-                        self.result_list = None
-
-                    for i, command in enumerate(commands):
-
-                        #raise Exception('dummy')
-                        completed = (i == last_command)
-                        command.run(self, completed)
-
-                    if self.batched_returns:
-                        
self.return_object(Batched_result_command(processor=self, 
result_commands=self.result_list))
-                        self.result_list = None
-
-                except:
-                    capturing_exception = 
Capturing_exception(rank=self.rank(), name=self.get_name())
-                    exception_result = 
Result_exception(exception=capturing_exception, processor=self, 
completed=True)
-
-                    self.return_object(exception_result)
-                    self.result_list = None
-
-        self.post_run()
-        if self.on_master():
-            # note this a modified exit that kills all MPI processors
-            sys.exit()
-
-
-    #TODO: move up a level add virtaul send and revieve functions
-    def run_command_queue(self, queue):
-            self.assert_on_master()
-
-            running_set = set()
-            idle_set = set([i for i in range(1, self.processor_size()+1)])
-
-            if self.threaded_result_processing:
-                result_queue = Threaded_result_queue(self)
-            else:
-                result_queue = Immediate_result_queue(self)
-
-            while len(queue) != 0:
-
-                while len(idle_set) != 0:
-                    if len(queue) != 0:
-                        command = queue.pop()
-                        dest = idle_set.pop()
-                        self.master_queue_command(command=command, dest=dest)
-                        running_set.add(dest)
-                    else:
-                        break
-
-                while len(running_set) !=0:
-                    result = self.master_recieve_result()
-                    #if isinstance(result, Result_exception):
-                    #    print 'result', result
-                    #    sys.exit()
-
-                    if result.completed:
-                        idle_set.add(result.rank)
-                        print 'idle set', `idle_set`
-                        print 'running_set', `running_set`
-                        running_set.remove(result.rank)
-
-                    result_queue.put(result)
-
-            if self.threaded_result_processing:
-                result_queue.run_all()
-
-
-    #TODO: move up a level
-    def run_queue(self):
-        #FIXME: need a finally here to cleanup exceptions states
-         lqueue = self.chunk_queue(self.command_queue)
-         self.run_command_queue(lqueue)
-
-         del self.command_queue[:]
-         self.memo_map.clear()
-
-
-    def slave_recieve_commands(self):
-        raise_unimplemented(self.slave_recieve_commands)
-
-
-#FIXME: move up a level or more
-class Result_queue(object):
-    def __init__(self, processor):
-        self.processor = processor
-
-
-    def put(self, job):
-        if isinstance(job, Result_exception) :
-            self.processor.process_result(job)
-
-
-    def run_all(self):
-        raise_unimplemented(self.run_all)
-
-
-class Threaded_result_queue(Result_queue):
-    def __init__(self, processor):
-        super(Threaded_result_queue, self).__init__(processor)
-        self.queue = Queue.Queue()
-        self.sleep_time = 0.05
-        self.processor = processor
-        self.running = 1
-        # FIXME: syntax error here produces exception but no quit
-        self.thread1 = threading.Thread(target=self.workerThread)
-        self.thread1.setDaemon(1)
-        self.thread1.start()
-
-
-    def put(self, job):
-        super(Threaded_result_queue, self).put(job)
-        self.queue.put_nowait(job)
-
-
-    def run_all(self):
-        self.queue.put_nowait(RESULT_QUEUE_EXIT_COMMAND)
-        self.thread1.join()
-
-
-    def workerThread(self):
-            try:
-                while True:
-                    job = self.queue.get()
-                    if job == RESULT_QUEUE_EXIT_COMMAND:
-                        break
-                    self.processor.process_result(job)
-            except:
-                traceback.print_exc(file=sys.stdout)
-                # FIXME: this doesn't work because this isn't the main 
thread so sys.exit fails...
-                self.processor.abort()
-
-
-class Too_few_slaves_exception(Exception):
-    def __init__(self):
-        msg = 'master slave processing requires at least 2 processors to run 
you only provided 1, exiting....'
-        Exception.__init__(self, msg)

Copied: branches/multi_processor_merge/multi/multi_processor_base.py (from 
r7842, branches/multi_processor_merge/multi/multi_processor.py)
URL: 
http://svn.gna.org/viewcvs/relax/branches/multi_processor_merge/multi/multi_processor_base.py?p2=branches/multi_processor_merge/multi/multi_processor_base.py&p1=branches/multi_processor_merge/multi/multi_processor.py&r1=7842&r2=7843&rev=7843&view=diff
==============================================================================
--- branches/multi_processor_merge/multi/multi_processor.py (original)
+++ branches/multi_processor_merge/multi/multi_processor_base.py Sun Oct 19 
21:31:45 2008
@@ -19,7 +19,6 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  
 #
 #                                                                            
 #
 
###############################################################################
-# rename me I look like a processor! not a base class (change to 
multi_processor_base?) so relax --multi multi will try to load me ;-)
 
 # Module docstring.
 """Module containing a Processor base class to be used by any 
multi-processor methodology.




Related Messages


Powered by MHonArc, Updated Sun Oct 19 21:40:02 2008