1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24  """Module containing a Processor base class to be used by any multi-processor fabric. 
 25   
 26  This is used by the mpi4py clustering code.  It can also be used by any new implementation 
 27  including, for example: 
 28   
 29      - Other implementations using different python MPI libraries (pypar, etc.). 
 30      - Use of ssh tunnels for parallel programming. 
 31      - Use of the twisted frame work for communication (http://twistedmatrix.com/projects/). 
 32      - The parallel virtual machine (pvm) via pypvm (http://pypvm.sourceforge.net). 
 33  """ 
 34   
 35   
 36  from copy import copy 
 37  import math 
 38  import sys 
 39  import threading 
 40  import traceback 
 41   
 42   
 43  from multi.misc import raise_unimplemented, Result, Result_string, Verbosity; verbosity = Verbosity() 
 44  from multi.processor import Processor 
 45  from multi.result_commands import Batched_result_command, Result_command, Result_exception 
 46   
 47   
 49      """The multi-processor base class.""" 
 50   
 51 -    def __init__(self, processor_size, callback): 
  63   
 64   
 65       
 71   
 72   
 73       
 75          lqueue = copy(queue) 
 76          result = [] 
 77          processors = self.processor_size() 
 78          chunks = processors * self.grainyness 
 79          chunk_size = int(math.floor(float(len(queue)) / float(chunks))) 
 80   
 81          if chunk_size < 1: 
 82              result = queue 
 83          else: 
 84              for i in range(chunks): 
 85                  result.append(lqueue[:chunk_size]) 
 86                  del lqueue[:chunk_size] 
 87              for i, elem in enumerate(lqueue): 
 88                  result[i].append(elem) 
 89          return result 
  90   
 91   
 92       
 94          if self.rank() == 0: 
 95              return True 
  96   
 97   
 98       
101   
102   
103 -    def post_run(self): 
 104   
105          super(Multi_processor, self).post_run() 
 106   
107   
109          """Method called before starting the application main loop""" 
110   
111           
112          super(Multi_processor, self).pre_run() 
 113   
114   
115       
117          if isinstance(result, Result): 
118              if isinstance(result, Result_command): 
119                  memo = None 
120                  if result.memo_id != None: 
121                      memo = self.memo_map[result.memo_id] 
122                  result.run(self, memo) 
123                  if result.memo_id != None and result.completed: 
124                      del self.memo_map[result.memo_id] 
125   
126              elif isinstance(result, Result_string): 
127                   
128                  sys.stdout.write(result.string) 
129          else: 
130              message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__, result) 
131              raise Exception(message) 
 132   
133   
134       
136          result_object = None 
137           
138          if isinstance(result, Result_exception): 
139              result_object = result 
140          elif self.batched_returns: 
141              is_batch_result = isinstance(result, Batched_result_command) 
142   
143              if is_batch_result: 
144                  result_object = result 
145              else: 
146                  if self.result_list != None: 
147                      self.result_list.append(result) 
148          else: 
149              result_object = result 
150   
151          if result_object != None: 
152               
153              result_object.rank = self.rank() 
154              self.return_result_command(result_object=result_object) 
 155   
156   
159   
160   
 163   
164   
165   
168          msg = 'master slave processing requires at least 2 processors to run you only provided 1, exiting....' 
169          Exception.__init__(self, msg) 
  170