Package multi :: Module multi_processor_base
[hide private]
[frames] | no frames]

Source Code for Module multi.multi_processor_base

  1  ############################################################################### 
  2  #                                                                             # 
  3  # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)    # 
  4  # Copyright (C) 2011-2013 Edward d'Auvergne                                   # 
  5  #                                                                             # 
  6  # This file is part of the program relax (http://www.nmr-relax.com).          # 
  7  #                                                                             # 
  8  # This program is free software: you can redistribute it and/or modify        # 
  9  # it under the terms of the GNU General Public License as published by        # 
 10  # the Free Software Foundation, either version 3 of the License, or           # 
 11  # (at your option) any later version.                                         # 
 12  #                                                                             # 
 13  # This program is distributed in the hope that it will be useful,             # 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of              # 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               # 
 16  # GNU General Public License for more details.                                # 
 17  #                                                                             # 
 18  # You should have received a copy of the GNU General Public License           # 
 19  # along with this program.  If not, see <http://www.gnu.org/licenses/>.       # 
 20  #                                                                             # 
 21  ############################################################################### 
 22   
 23  # Module docstring. 
 24  """Module containing a Processor base class to be used by any multi-processor fabric. 
 25   
 26  This is used by the mpi4py clustering code.  It can also be used by any new implementation 
 27  including, for example: 
 28   
 29      - Other implementations using different python MPI libraries (pypar, etc.). 
 30      - Use of ssh tunnels for parallel programming. 
 31      - Use of the twisted frame work for communication (http://twistedmatrix.com/projects/). 
 32      - The parallel virtual machine (pvm) via pypvm (http://pypvm.sourceforge.net). 
 33  """ 
 34   
 35  # Python module imports. 
 36  from copy import copy 
 37  import math 
 38  import sys 
 39   
 40  # multi module imports. 
 41  from multi.misc import raise_unimplemented, Result, Result_string, Verbosity; verbosity = Verbosity() 
 42  from multi.processor import Processor 
 43  from multi.result_commands import Batched_result_command, Result_command, Result_exception 
 44   
 45   
46 -class Multi_processor(Processor):
47 """The multi-processor base class.""" 48
49 - def __init__(self, processor_size, callback):
50 super(Multi_processor, self).__init__(processor_size=processor_size, callback=callback) 51 52 self.do_quit = False 53 54 #FIXME un clone from uniprocessor 55 #command queue and memo queue 56 self.command_queue = [] 57 self.memo_map = {} 58 59 self.batched_returns = True 60 self.result_list = None
61 62 63 #TODO: move up a level
64 - def add_to_queue(self, command, memo=None):
65 self.command_queue.append(command) 66 if memo != None: 67 command.set_memo_id(memo) 68 self.memo_map[memo.memo_id()] = memo
69 70 71 #TODO: move up a level
72 - def chunk_queue(self, queue):
73 lqueue = copy(queue) 74 result = [] 75 processors = self.processor_size() 76 chunks = processors * self.grainyness 77 chunk_size = int(math.floor(float(len(queue)) / float(chunks))) 78 79 if chunk_size < 1: 80 result = queue 81 else: 82 for i in range(chunks): 83 result.append(lqueue[:chunk_size]) 84 del lqueue[:chunk_size] 85 for i, elem in enumerate(lqueue): 86 result[i].append(elem) 87 return result
88 89 90 # FIXME move to lower level
91 - def on_master(self):
92 if self.rank() == 0: 93 return True
94 95 96 # FIXME move to lower level
97 - def on_slave(self):
98 return not self.on_master()
99 100
101 - def post_run(self):
102 103 super(Multi_processor, self).post_run()
104 105
106 - def pre_run(self):
107 """Method called before starting the application main loop""" 108 109 # Execute the base class method. 110 super(Multi_processor, self).pre_run()
111 112 113 #FIXME: fill out generic result processing move to processor
114 - def process_result(self, result):
115 if isinstance(result, Result): 116 if isinstance(result, Result_command): 117 memo = None 118 if result.memo_id != None: 119 memo = self.memo_map[result.memo_id] 120 result.run(self, memo) 121 if result.memo_id != None and result.completed: 122 del self.memo_map[result.memo_id] 123 124 elif isinstance(result, Result_string): 125 #FIXME can't cope with multiple lines 126 sys.stdout.write(result.string) 127 else: 128 message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__, result) 129 raise Exception(message)
130 131 132 #TODO: move up a level add send and revieve virtual functions
133 - def return_object(self, result):
134 result_object = None 135 #raise Exception('dummy') 136 if isinstance(result, Result_exception): 137 result_object = result 138 elif self.batched_returns: 139 is_batch_result = isinstance(result, Batched_result_command) 140 141 if is_batch_result: 142 result_object = result 143 else: 144 if self.result_list != None: 145 self.result_list.append(result) 146 else: 147 result_object = result 148 149 if result_object != None: 150 #FIXME check is used? 151 result_object.rank = self.rank() 152 self.return_result_command(result_object=result_object)
153 154
155 - def return_result_command(self, result_object):
156 raise_unimplemented(self.slave_queue_result)
157 158
159 - def slave_receive_commands(self):
161 162 163
164 -class Too_few_slaves_exception(Exception):
165 - def __init__(self):
166 msg = 'master slave processing requires at least 2 processors to run you only provided 1, exiting....' 167 Exception.__init__(self, msg)
168