1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 """Module containing a Processor base class to be used by any multi-processor fabric.
26
27 This is used by the mpi4py clustering code. It can also be used by any new implementation
28 including, for example:
29
30 - Other implementations using different python MPI libraries (pypar, etc.).
31 - Use of ssh tunnels for parallel programming.
32 - Use of the twisted frame work for communication (http://twistedmatrix.com/projects/).
33 - The parallel virtual machine (pvm) via pypvm (http://pypvm.sourceforge.net).
34 """
35
36
37 import Queue
38 from copy import copy
39 import math
40 import sys
41 import threading
42 import traceback
43
44
45 from multi.misc import raise_unimplemented, Result, Result_string, Verbosity; verbosity = Verbosity()
46 from multi.processor import Processor
47 from multi.result_commands import Batched_result_command, Result_command, Result_exception
48
49
51 """The multi-processor base class."""
52
53 - def __init__(self, processor_size, callback):
65
66
67
73
74
75
77 lqueue = copy(queue)
78 result = []
79 processors = self.processor_size()
80 chunks = processors * self.grainyness
81 chunk_size = int(math.floor(float(len(queue)) / float(chunks)))
82
83 if chunk_size < 1:
84 result = queue
85 else:
86 for i in range(chunks):
87 result.append(lqueue[:chunk_size])
88 del lqueue[:chunk_size]
89 for i, elem in enumerate(lqueue):
90 result[i].append(elem)
91 return result
92
93
94
96 if self.rank() == 0:
97 return True
98
99
100
103
104
105 - def post_run(self):
106
107 super(Multi_processor, self).post_run()
108
109
111 """Method called before starting the application main loop"""
112
113
114 super(Multi_processor, self).pre_run()
115
116
117
119 if isinstance(result, Result):
120 if isinstance(result, Result_command):
121 memo = None
122 if result.memo_id != None:
123 memo = self.memo_map[result.memo_id]
124 result.run(self, memo)
125 if result.memo_id != None and result.completed:
126 del self.memo_map[result.memo_id]
127
128 elif isinstance(result, Result_string):
129
130 sys.stdout.write(result.string)
131 else:
132 message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__, result)
133 raise Exception(message)
134
135
136
138 result_object = None
139
140 if isinstance(result, Result_exception):
141 result_object = result
142 elif self.batched_returns:
143 is_batch_result = isinstance(result, Batched_result_command)
144
145 if is_batch_result:
146 result_object = result
147 else:
148 if self.result_list != None:
149 self.result_list.append(result)
150 else:
151 result_object = result
152
153 if result_object != None:
154
155 result_object.rank = self.rank()
156 self.return_result_command(result_object=result_object)
157
158
161
162
165
166
167
170 msg = 'master slave processing requires at least 2 processors to run you only provided 1, exiting....'
171 Exception.__init__(self, msg)
172