1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 """Module containing a Processor base class to be used by any multi-processor fabric.
25
26 This is used by the mpi4py clustering code. It can also be used by any new implementation
27 including, for example:
28
29 - Other implementations using different python MPI libraries (pypar, etc.).
30 - Use of ssh tunnels for parallel programming.
31 - Use of the twisted frame work for communication (http://twistedmatrix.com/projects/).
32 - The parallel virtual machine (pvm) via pypvm (http://pypvm.sourceforge.net).
33 """
34
35
36 from copy import copy
37 import math
38 import sys
39 import threading
40 import traceback
41
42
43 from multi.misc import raise_unimplemented, Result, Result_string, Verbosity; verbosity = Verbosity()
44 from multi.processor import Processor
45 from multi.result_commands import Batched_result_command, Result_command, Result_exception
46
47
49 """The multi-processor base class."""
50
51 - def __init__(self, processor_size, callback):
63
64
65
71
72
73
75 lqueue = copy(queue)
76 result = []
77 processors = self.processor_size()
78 chunks = processors * self.grainyness
79 chunk_size = int(math.floor(float(len(queue)) / float(chunks)))
80
81 if chunk_size < 1:
82 result = queue
83 else:
84 for i in range(chunks):
85 result.append(lqueue[:chunk_size])
86 del lqueue[:chunk_size]
87 for i, elem in enumerate(lqueue):
88 result[i].append(elem)
89 return result
90
91
92
94 if self.rank() == 0:
95 return True
96
97
98
101
102
103 - def post_run(self):
104
105 super(Multi_processor, self).post_run()
106
107
109 """Method called before starting the application main loop"""
110
111
112 super(Multi_processor, self).pre_run()
113
114
115
117 if isinstance(result, Result):
118 if isinstance(result, Result_command):
119 memo = None
120 if result.memo_id != None:
121 memo = self.memo_map[result.memo_id]
122 result.run(self, memo)
123 if result.memo_id != None and result.completed:
124 del self.memo_map[result.memo_id]
125
126 elif isinstance(result, Result_string):
127
128 sys.stdout.write(result.string)
129 else:
130 message = 'Unexpected result type \n%s \nvalue%s' %(result.__class__.__name__, result)
131 raise Exception(message)
132
133
134
136 result_object = None
137
138 if isinstance(result, Result_exception):
139 result_object = result
140 elif self.batched_returns:
141 is_batch_result = isinstance(result, Batched_result_command)
142
143 if is_batch_result:
144 result_object = result
145 else:
146 if self.result_list != None:
147 self.result_list.append(result)
148 else:
149 result_object = result
150
151 if result_object != None:
152
153 result_object.rank = self.rank()
154 self.return_result_command(result_object=result_object)
155
156
159
160
163
164
165
168 msg = 'master slave processing requires at least 2 processors to run you only provided 1, exiting....'
169 Exception.__init__(self, msg)
170