1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 """The MPI processor fabric via the mpi4py Python implementation."""
25
26
27
28
29
30
31 try:
32 from mpi4py import MPI
33 except ImportError:
34 MPI = None
35 import os
36 import sys
37 import textwrap
38
39
40 from multi.slave_commands import Exit_command
41 from multi.multi_processor_base import Multi_processor, Too_few_slaves_exception
42
43
45 """The mpi4py multi-processor class."""
46
47 - def __init__(self, processor_size, callback):
67
68
70 for i in range(1, MPI.COMM_WORLD.size):
71 if i != 0:
72 MPI.COMM_WORLD.send(obj=command, dest=i)
73
74
76 for i in range(1, MPI.COMM_WORLD.size):
77 if i != 0:
78 while True:
79 result = MPI.COMM_WORLD.recv(source=i)
80 if result.completed:
81 break
82
83
85 MPI.COMM_WORLD.Abort()
86
87
89 """Make sure that this is the master processor and not a slave.
90
91 @raises Exception: If not on the master processor.
92 """
93
94
95 if self.on_slave():
96 msg = 'running on slave when expected master with MPI.rank == 0, rank was %d'% self.rank()
97 raise Exception(msg)
98
99
100 - def exit(self, status=0):
101 """Exit the mpi4py processor with the given status.
102
103 @keyword status: The program exit status.
104 @type status: int
105 """
106
107
108 if MPI.COMM_WORLD.rank != 0:
109
110 if self.in_main_loop:
111 raise Exception('sys.exit unexpectedly called on slave!')
112
113
114 else:
115 sys.stderr.write('\n')
116 sys.stderr.write('***********************************************\n')
117 sys.stderr.write('\n')
118 sys.stderr.write('warning sys.exit called before mpi4py main loop\n')
119 sys.stderr.write('\n')
120 sys.stderr.write('***********************************************\n')
121 sys.stderr.write('\n')
122 MPI.COMM_WORLD.Abort()
123
124
125 else:
126
127 if MPI.Is_initialized() and not MPI.Is_finalized() and MPI.COMM_WORLD.rank == 0:
128
129 self._broadcast_command(Exit_command())
130
131
132 self._ditch_all_results()
133
134
135 sys.exit(status)
136
137
139 """Return the string to append to the end of the relax introduction string.
140
141 @return: The string describing this Processor fabric.
142 @rtype: str
143 """
144
145
146 version_info = MPI.Get_version()
147
148
149 vendor = MPI.get_vendor()
150 vendor_name = vendor[0]
151 vendor_version = str(vendor[1][0])
152 for i in range(1, len(vendor[1])):
153 vendor_version = vendor_version + '.%i' % vendor[1][i]
154
155
156 return "MPI %s.%s running via mpi4py with %i slave processors & 1 master. Using %s %s." % (version_info[0], version_info[1], self.processor_size(), vendor_name, vendor_version)
157
158
160 return '%s-pid%s' % (MPI.Get_processor_name(), os.getpid())
161
162
164 """Slave to master processor data transfer - send the result command from the slave.
165
166 @param command: The results command to send to the master.
167 @type command: Results_command instance
168 @param dest: The destination processor's rank.
169 @type dest: int
170 """
171
172
173 MPI.COMM_WORLD.send(obj=command, dest=dest)
174
175
177 """Slave to master processor data transfer - receive the result command from the slave.
178
179 This is invoked by the master processor.
180
181 @return: The result command sent by the slave.
182 @rtype: Result_command instance
183 """
184
185
186 return MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE)
187
188
190 return MPI.COMM_WORLD.rank
191
192
194 MPI.COMM_WORLD.send(obj=result_object, dest=0)
195
196
198 self.in_main_loop = True
199 super(Mpi4py_processor, self).run()
200 self.in_main_loop = False
201
202
204 return MPI.COMM_WORLD.recv(source=0)
205