1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 """Module containing a Processor base class to be used by any multi-processor fabric.
25
26 This is used by the mpi4py clustering code. It can also be used by any new implementation
27 including, for example:
28
29 - Other implementations using different python MPI libraries (pypar, etc.).
30 - Use of ssh tunnels for parallel programming.
31 - Use of the twisted frame work for communication (http://twistedmatrix.com/projects/).
32 - The parallel virtual machine (pvm) via pypvm (http://pypvm.sourceforge.net).
33 """
34
35
36 from copy import copy
37 import math
38 import sys
39
40
41 from multi.misc import raise_unimplemented, Result, Result_string, Verbosity; verbosity = Verbosity()
42 from multi.processor import Processor
43 from multi.result_commands import Batched_result_command, Result_command, Result_exception
44
45
47 """The multi-processor base class."""
48
49 - def __init__(self, processor_size, callback):
61
62
63
69
70
71
73 lqueue = copy(queue)
74 result = []
75 processors = self.processor_size()
76 chunks = processors * self.grainyness
77 chunk_size = int(math.floor(float(len(queue)) / float(chunks)))
78
79 if chunk_size < 1:
80 result = queue
81 else:
82 for i in range(chunks):
83 result.append(lqueue[:chunk_size])
84 del lqueue[:chunk_size]
85 for i, elem in enumerate(lqueue):
86 result[i].append(elem)
87 return result
88
89
90
92 if self.rank() == 0:
93 return True
94
95
96
99
100
101 - def post_run(self):
102
103 super(Multi_processor, self).post_run()
104
105
107 """Method called before starting the application main loop"""
108
109
110 super(Multi_processor, self).pre_run()
111
112
113
130
131
132
134 result_object = None
135
136 if isinstance(result, Result_exception):
137 result_object = result
138 elif self.batched_returns:
139 is_batch_result = isinstance(result, Batched_result_command)
140
141 if is_batch_result:
142 result_object = result
143 else:
144 if self.result_list != None:
145 self.result_list.append(result)
146 else:
147 result_object = result
148
149 if result_object != None:
150
151 result_object.rank = self.rank()
152 self.return_result_command(result_object=result_object)
153
154
157
158
161
162
163
166 msg = 'master slave processing requires at least 2 processors to run you only provided 1, exiting....'
167 Exception.__init__(self, msg)
168