1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 """The MPI processor fabric via the mpi4py Python implementation."""
25
26
27
28
29
30
31 try:
32 from mpi4py import MPI
33 except ImportError:
34 MPI = None
35 import os
36 import sys
37
38
39 from multi.slave_commands import Exit_command
40 from multi.multi_processor_base import Multi_processor, Too_few_slaves_exception
41
42
44 """The mpi4py multi-processor class."""
45
46 - def __init__(self, processor_size, callback):
66
67
69 for i in range(1, MPI.COMM_WORLD.size):
70 if i != 0:
71 MPI.COMM_WORLD.send(obj=command, dest=i)
72
73
75 for i in range(1, MPI.COMM_WORLD.size):
76 if i != 0:
77 while True:
78 result = MPI.COMM_WORLD.recv(source=i)
79 if result.completed:
80 break
81
82
84 MPI.COMM_WORLD.Abort()
85
86
88 """Make sure that this is the master processor and not a slave.
89
90 @raises Exception: If not on the master processor.
91 """
92
93
94 if self.on_slave():
95 msg = 'running on slave when expected master with MPI.rank == 0, rank was %d'% self.rank()
96 raise Exception(msg)
97
98
99 - def exit(self, status=0):
100 """Exit the mpi4py processor with the given status.
101
102 @keyword status: The program exit status.
103 @type status: int
104 """
105
106
107 if MPI.COMM_WORLD.rank != 0:
108
109 if self.in_main_loop:
110 raise Exception('sys.exit unexpectedly called on slave!')
111
112
113 else:
114 sys.stderr.write('\n')
115 sys.stderr.write('***********************************************\n')
116 sys.stderr.write('\n')
117 sys.stderr.write('warning sys.exit called before mpi4py main loop\n')
118 sys.stderr.write('\n')
119 sys.stderr.write('***********************************************\n')
120 sys.stderr.write('\n')
121 MPI.COMM_WORLD.Abort()
122
123
124 else:
125
126 if MPI.Is_initialized() and not MPI.Is_finalized() and MPI.COMM_WORLD.rank == 0:
127
128 self._broadcast_command(Exit_command())
129
130
131 self._ditch_all_results()
132
133
134 sys.exit(status)
135
136
138 """Return the string to append to the end of the relax introduction string.
139
140 @return: The string describing this Processor fabric.
141 @rtype: str
142 """
143
144
145 version_info = MPI.Get_version()
146
147
148 vendor = MPI.get_vendor()
149 vendor_name = vendor[0]
150 vendor_version = str(vendor[1][0])
151 for i in range(1, len(vendor[1])):
152 vendor_version = vendor_version + '.%i' % vendor[1][i]
153
154
155 return "MPI %s.%s running via mpi4py with %i slave processors & 1 master. Using %s %s." % (version_info[0], version_info[1], self.processor_size(), vendor_name, vendor_version)
156
157
159 return '%s-pid%s' % (MPI.Get_processor_name(), os.getpid())
160
161
163 """Slave to master processor data transfer - send the result command from the slave.
164
165 @param command: The results command to send to the master.
166 @type command: Results_command instance
167 @param dest: The destination processor's rank.
168 @type dest: int
169 """
170
171
172 MPI.COMM_WORLD.send(obj=command, dest=dest)
173
174
176 """Slave to master processor data transfer - receive the result command from the slave.
177
178 This is invoked by the master processor.
179
180 @return: The result command sent by the slave.
181 @rtype: Result_command instance
182 """
183
184
185 return MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE)
186
187
190
191
193 MPI.COMM_WORLD.send(obj=result_object, dest=0)
194
195
197 self.in_main_loop = True
198 super(Mpi4py_processor, self).run()
199 self.in_main_loop = False
200
201
203 return MPI.COMM_WORLD.recv(source=0)
204