1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 """The MPI processor fabric via the mpi4py Python implementation."""
26
27
28
29
30
31
32 try:
33 from mpi4py import MPI
34 except ImportError:
35 MPI = None
36 import os
37 import sys
38 import textwrap
39
40
41 from multi.slave_commands import Exit_command
42 from multi.multi_processor_base import Multi_processor, Too_few_slaves_exception
43
44
46 """The mpi4py multi-processor class."""
47
48 - def __init__(self, processor_size, callback):
68
69
71 for i in range(1, MPI.COMM_WORLD.size):
72 if i != 0:
73 MPI.COMM_WORLD.send(obj=command, dest=i)
74
75
77 for i in range(1, MPI.COMM_WORLD.size):
78 if i != 0:
79 while True:
80 result = MPI.COMM_WORLD.recv(source=i)
81 if result.completed:
82 break
83
84
86 MPI.COMM_WORLD.Abort()
87
88
90 """Make sure that this is the master processor and not a slave.
91
92 @raises Exception: If not on the master processor.
93 """
94
95
96 if self.on_slave():
97 msg = 'running on slave when expected master with MPI.rank == 0, rank was %d'% self.rank()
98 raise Exception(msg)
99
100
101 - def exit(self, status=0):
102 """Exit the mpi4py processor with the given status.
103
104 @keyword status: The program exit status.
105 @type status: int
106 """
107
108
109 if MPI.COMM_WORLD.rank != 0:
110
111 if self.in_main_loop:
112 raise Exception('sys.exit unexpectedly called on slave!')
113
114
115 else:
116 sys.stderr.write('\n')
117 sys.stderr.write('***********************************************\n')
118 sys.stderr.write('\n')
119 sys.stderr.write('warning sys.exit called before mpi4py main loop\n')
120 sys.stderr.write('\n')
121 sys.stderr.write('***********************************************\n')
122 sys.stderr.write('\n')
123 MPI.COMM_WORLD.Abort()
124
125
126 else:
127
128 if MPI.Is_initialized() and not MPI.Is_finalized() and MPI.COMM_WORLD.rank == 0:
129
130 self._broadcast_command(Exit_command())
131
132
133 self._ditch_all_results()
134
135
136 sys.exit(status)
137
138
140 """Return the string to append to the end of the relax introduction string.
141
142 @return: The string describing this Processor fabric.
143 @rtype: str
144 """
145
146
147 version_info = MPI.Get_version()
148
149
150 vendor = MPI.get_vendor()
151 vendor_name = vendor[0]
152 vendor_version = str(vendor[1][0])
153 for i in range(1, len(vendor[1])):
154 vendor_version = vendor_version + '.%i' % vendor[1][i]
155
156
157 return "MPI %s.%s running via mpi4py with %i slave processors & 1 master. Using %s %s." % (version_info[0], version_info[1], self.processor_size(), vendor_name, vendor_version)
158
159
161 return '%s-pid%s' % (MPI.Get_processor_name(), os.getpid())
162
163
165 """Slave to master processor data transfer - send the result command from the slave.
166
167 @param command: The results command to send to the master.
168 @type command: Results_command instance
169 @param dest: The destination processor's rank.
170 @type dest: int
171 """
172
173
174 MPI.COMM_WORLD.send(obj=command, dest=dest)
175
176
178 """Slave to master processor data transfer - receive the result command from the slave.
179
180 This is invoked by the master processor.
181
182 @return: The result command sent by the slave.
183 @rtype: Result_command instance
184 """
185
186
187 return MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE)
188
189
191 return MPI.COMM_WORLD.rank
192
193
195 MPI.COMM_WORLD.send(obj=result_object, dest=0)
196
197
199 self.in_main_loop = True
200 super(Mpi4py_processor, self).run()
201 self.in_main_loop = False
202
203
205 return MPI.COMM_WORLD.recv(source=0)
206