Package multi :: Module multi_processor_base
[hide private]
[frames] | no frames]

Source Code for Module multi.multi_processor_base

  1  ############################################################################### 
  2  #                                                                             # 
  3  # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)    # 
  4  # Copyright (C) 2011-2012 Edward d'Auvergne                                   # 
  5  #                                                                             # 
  6  # This file is part of the program relax.                                     # 
  7  #                                                                             # 
  8  # relax is free software; you can redistribute it and/or modify               # 
  9  # it under the terms of the GNU General Public License as published by        # 
 10  # the Free Software Foundation; either version 2 of the License, or           # 
 11  # (at your option) any later version.                                         # 
 12  #                                                                             # 
 13  # relax is distributed in the hope that it will be useful,                    # 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of              # 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               # 
 16  # GNU General Public License for more details.                                # 
 17  #                                                                             # 
 18  # You should have received a copy of the GNU General Public License           # 
 19  # along with relax; if not, write to the Free Software                        # 
 20  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA   # 
 21  #                                                                             # 
 22  ############################################################################### 
 23   
 24  # Module docstring. 
 25  """Module containing a Processor base class to be used by any multi-processor fabric. 
 26   
 27  This is used by the mpi4py clustering code.  It can also be used by any new implementation 
 28  including, for example: 
 29   
 30      - Other implementations using different python MPI libraries (pypar, etc.). 
 31      - Use of ssh tunnels for parallel programming. 
 32      - Use of the twisted frame work for communication (http://twistedmatrix.com/projects/). 
 33      - The parallel virtual machine (pvm) via pypvm (http://pypvm.sourceforge.net). 
 34  """ 
 35   
 36  # Python module imports. 
 37  import Queue 
 38  from copy import copy 
 39  import math 
 40  import sys 
 41  import threading 
 42  import traceback 
 43   
 44  # multi module imports. 
 45  from multi.misc import raise_unimplemented, Result, Result_string, Verbosity; verbosity = Verbosity() 
 46  from multi.processor import Processor 
 47  from multi.result_commands import Batched_result_command, Result_command, Result_exception 
 48   
 49   
50 -class Multi_processor(Processor):
51 """The multi-processor base class.""" 52
53 - def __init__(self, processor_size, callback):
54 super(Multi_processor, self).__init__(processor_size=processor_size, callback=callback) 55 56 self.do_quit = False 57 58 #FIXME un clone from uniprocessor 59 #command queue and memo queue 60 self.command_queue = [] 61 self.memo_map = {} 62 63 self.batched_returns = True 64 self.result_list = None
65 66 67 #TODO: move up a level
68 - def add_to_queue(self, command, memo=None):
69 self.command_queue.append(command) 70 if memo != None: 71 command.set_memo_id(memo) 72 self.memo_map[memo.memo_id()] = memo
73 74 75 #TODO: move up a level
76 - def chunk_queue(self, queue):
77 lqueue = copy(queue) 78 result = [] 79 processors = self.processor_size() 80 chunks = processors * self.grainyness 81 chunk_size = int(math.floor(float(len(queue)) / float(chunks))) 82 83 if chunk_size < 1: 84 result = queue 85 else: 86 for i in range(chunks): 87 result.append(lqueue[:chunk_size]) 88 del lqueue[:chunk_size] 89 for i, elem in enumerate(lqueue): 90 result[i].append(elem) 91 return result
92 93 94 # FIXME move to lower level
95 - def on_master(self):
96 if self.rank() == 0: 97 return True
98 99 100 # FIXME move to lower level
101 - def on_slave(self):
102 return not self.on_master()
103 104
105 - def post_run(self):
106 107 super(Multi_processor, self).post_run()
108 109
110 - def pre_run(self):
111 """Method called before starting the application main loop""" 112 113 # Execute the base class method. 114 super(Multi_processor, self).pre_run()
115 116 117 #FIXME: fill out generic result processing move to processor
118 - def process_result(self, result):
119 if isinstance(result, Result): 120 if isinstance(result, Result_command): 121 memo = None 122 if result.memo_id != None: 123 memo = self.memo_map[result.memo_id] 124 result.run(self, memo) 125 if result.memo_id != None and result.completed: 126 del self.memo_map[result.memo_id] 127 128 elif isinstance(result, Result_string): 129 #FIXME can't cope with multiple lines 130 sys.stdout.write(result.string) 131 else: 132 message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__, result) 133 raise Exception(message)
134 135 136 #TODO: move up a level add send and revieve virtual functions
137 - def return_object(self, result):
138 result_object = None 139 #raise Exception('dummy') 140 if isinstance(result, Result_exception): 141 result_object = result 142 elif self.batched_returns: 143 is_batch_result = isinstance(result, Batched_result_command) 144 145 if is_batch_result: 146 result_object = result 147 else: 148 if self.result_list != None: 149 self.result_list.append(result) 150 else: 151 result_object = result 152 153 if result_object != None: 154 #FIXME check is used? 155 result_object.rank = self.rank() 156 self.return_result_command(result_object=result_object)
157 158
159 - def return_result_command(self, result_object):
160 raise_unimplemented(self.slave_queue_result)
161 162
163 - def slave_receive_commands(self):
165 166 167
168 -class Too_few_slaves_exception(Exception):
169 - def __init__(self):
170 msg = 'master slave processing requires at least 2 processors to run you only provided 1, exiting....' 171 Exception.__init__(self, msg)
172