Package multi :: Module multi_processor_base
[hide private]
[frames] | no frames]

Source Code for Module multi.multi_processor_base

  1  ############################################################################### 
  2  #                                                                             # 
  3  # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)    # 
  4  # Copyright (C) 2011-2012 Edward d'Auvergne                                   # 
  5  #                                                                             # 
  6  # This file is part of the program relax (http://www.nmr-relax.com).          # 
  7  #                                                                             # 
  8  # This program is free software: you can redistribute it and/or modify        # 
  9  # it under the terms of the GNU General Public License as published by        # 
 10  # the Free Software Foundation, either version 3 of the License, or           # 
 11  # (at your option) any later version.                                         # 
 12  #                                                                             # 
 13  # This program is distributed in the hope that it will be useful,             # 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of              # 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               # 
 16  # GNU General Public License for more details.                                # 
 17  #                                                                             # 
 18  # You should have received a copy of the GNU General Public License           # 
 19  # along with this program.  If not, see <http://www.gnu.org/licenses/>.       # 
 20  #                                                                             # 
 21  ############################################################################### 
 22   
 23  # Module docstring. 
 24  """Module containing a Processor base class to be used by any multi-processor fabric. 
 25   
 26  This is used by the mpi4py clustering code.  It can also be used by any new implementation 
 27  including, for example: 
 28   
 29      - Other implementations using different python MPI libraries (pypar, etc.). 
 30      - Use of ssh tunnels for parallel programming. 
 31      - Use of the twisted frame work for communication (http://twistedmatrix.com/projects/). 
 32      - The parallel virtual machine (pvm) via pypvm (http://pypvm.sourceforge.net). 
 33  """ 
 34   
 35  # Python module imports. 
 36  from copy import copy 
 37  import math 
 38  import sys 
 39  import threading 
 40  import traceback 
 41   
 42  # multi module imports. 
 43  from multi.misc import raise_unimplemented, Result, Result_string, Verbosity; verbosity = Verbosity() 
 44  from multi.processor import Processor 
 45  from multi.result_commands import Batched_result_command, Result_command, Result_exception 
 46   
 47   
48 -class Multi_processor(Processor):
49 """The multi-processor base class.""" 50
51 - def __init__(self, processor_size, callback):
52 super(Multi_processor, self).__init__(processor_size=processor_size, callback=callback) 53 54 self.do_quit = False 55 56 #FIXME un clone from uniprocessor 57 #command queue and memo queue 58 self.command_queue = [] 59 self.memo_map = {} 60 61 self.batched_returns = True 62 self.result_list = None
63 64 65 #TODO: move up a level
66 - def add_to_queue(self, command, memo=None):
67 self.command_queue.append(command) 68 if memo != None: 69 command.set_memo_id(memo) 70 self.memo_map[memo.memo_id()] = memo
71 72 73 #TODO: move up a level
74 - def chunk_queue(self, queue):
75 lqueue = copy(queue) 76 result = [] 77 processors = self.processor_size() 78 chunks = processors * self.grainyness 79 chunk_size = int(math.floor(float(len(queue)) / float(chunks))) 80 81 if chunk_size < 1: 82 result = queue 83 else: 84 for i in range(chunks): 85 result.append(lqueue[:chunk_size]) 86 del lqueue[:chunk_size] 87 for i, elem in enumerate(lqueue): 88 result[i].append(elem) 89 return result
90 91 92 # FIXME move to lower level
93 - def on_master(self):
94 if self.rank() == 0: 95 return True
96 97 98 # FIXME move to lower level
99 - def on_slave(self):
100 return not self.on_master()
101 102
103 - def post_run(self):
104 105 super(Multi_processor, self).post_run()
106 107
108 - def pre_run(self):
109 """Method called before starting the application main loop""" 110 111 # Execute the base class method. 112 super(Multi_processor, self).pre_run()
113 114 115 #FIXME: fill out generic result processing move to processor
116 - def process_result(self, result):
117 if isinstance(result, Result): 118 if isinstance(result, Result_command): 119 memo = None 120 if result.memo_id != None: 121 memo = self.memo_map[result.memo_id] 122 result.run(self, memo) 123 if result.memo_id != None and result.completed: 124 del self.memo_map[result.memo_id] 125 126 elif isinstance(result, Result_string): 127 #FIXME can't cope with multiple lines 128 sys.stdout.write(result.string) 129 else: 130 message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__, result) 131 raise Exception(message)
132 133 134 #TODO: move up a level add send and revieve virtual functions
135 - def return_object(self, result):
136 result_object = None 137 #raise Exception('dummy') 138 if isinstance(result, Result_exception): 139 result_object = result 140 elif self.batched_returns: 141 is_batch_result = isinstance(result, Batched_result_command) 142 143 if is_batch_result: 144 result_object = result 145 else: 146 if self.result_list != None: 147 self.result_list.append(result) 148 else: 149 result_object = result 150 151 if result_object != None: 152 #FIXME check is used? 153 result_object.rank = self.rank() 154 self.return_result_command(result_object=result_object)
155 156
157 - def return_result_command(self, result_object):
158 raise_unimplemented(self.slave_queue_result)
159 160
161 - def slave_receive_commands(self):
163 164 165
166 -class Too_few_slaves_exception(Exception):
167 - def __init__(self):
168 msg = 'master slave processing requires at least 2 processors to run you only provided 1, exiting....' 169 Exception.__init__(self, msg)
170