Package multi :: Module mpi4py_processor
[hide private]
[frames] | no frames]

Source Code for Module multi.mpi4py_processor

  1  ############################################################################### 
  2  #                                                                             # 
  3  # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)    # 
  4  # Copyright (C) 2010-2012 Edward d'Auvergne                                   # 
  5  #                                                                             # 
  6  # This file is part of the program relax (http://www.nmr-relax.com).          # 
  7  #                                                                             # 
  8  # This program is free software: you can redistribute it and/or modify        # 
  9  # it under the terms of the GNU General Public License as published by        # 
 10  # the Free Software Foundation, either version 3 of the License, or           # 
 11  # (at your option) any later version.                                         # 
 12  #                                                                             # 
 13  # This program is distributed in the hope that it will be useful,             # 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of              # 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               # 
 16  # GNU General Public License for more details.                                # 
 17  #                                                                             # 
 18  # You should have received a copy of the GNU General Public License           # 
 19  # along with this program.  If not, see <http://www.gnu.org/licenses/>.       # 
 20  #                                                                             # 
 21  ############################################################################### 
 22   
 23  # Module docstring. 
 24  """The MPI processor fabric via the mpi4py Python implementation.""" 
 25   
 26   
 27  # TODO: clone communicators & resize 
 28  # TODO: check exceptions on master 
 29   
 30  # Python module imports. 
 31  try: 
 32      from mpi4py import MPI 
 33  except ImportError: 
 34      MPI = None 
 35  import os 
 36  import sys 
 37  import textwrap 
 38   
 39  # relax module imports. 
 40  from multi.slave_commands import Exit_command 
 41  from multi.multi_processor_base import Multi_processor, Too_few_slaves_exception 
 42   
 43   
44 -class Mpi4py_processor(Multi_processor):
45 """The mpi4py multi-processor class.""" 46
47 - def __init__(self, processor_size, callback):
48 """Initialise the mpi4py processor.""" 49 50 mpi_processor_size = MPI.COMM_WORLD.size-1 51 52 if processor_size == -1: 53 processor_size = mpi_processor_size 54 55 # FIXME: needs better support in relax generates stack trace 56 if mpi_processor_size == 0: 57 raise Too_few_slaves_exception() 58 59 msg = 'warning: mpi4py_processor is using 1 masters and %d slave processors you requested %d slaves\n' 60 if processor_size != (mpi_processor_size): 61 print(msg % (mpi_processor_size, processor_size)) 62 63 super(Mpi4py_processor, self).__init__(processor_size=mpi_processor_size, callback=callback) 64 65 # Initialise a flag for determining if we are in the run() method or not. 66 self.in_main_loop = False
67 68
69 - def _broadcast_command(self, command):
70 for i in range(1, MPI.COMM_WORLD.size): 71 if i != 0: 72 MPI.COMM_WORLD.send(obj=command, dest=i)
73 74
75 - def _ditch_all_results(self):
76 for i in range(1, MPI.COMM_WORLD.size): 77 if i != 0: 78 while True: 79 result = MPI.COMM_WORLD.recv(source=i) 80 if result.completed: 81 break
82 83
84 - def abort(self):
85 MPI.COMM_WORLD.Abort()
86 87
88 - def assert_on_master(self):
89 """Make sure that this is the master processor and not a slave. 90 91 @raises Exception: If not on the master processor. 92 """ 93 94 # Check if this processor is a slave, and if so throw an exception. 95 if self.on_slave(): 96 msg = 'running on slave when expected master with MPI.rank == 0, rank was %d'% self.rank() 97 raise Exception(msg)
98 99
100 - def exit(self, status=0):
101 """Exit the mpi4py processor with the given status. 102 103 @keyword status: The program exit status. 104 @type status: int 105 """ 106 107 # Execution on the slave. 108 if MPI.COMM_WORLD.rank != 0: 109 # Catch sys.exit being called on an executing slave. 110 if self.in_main_loop: 111 raise Exception('sys.exit unexpectedly called on slave!') 112 113 # Catch sys.exit 114 else: 115 sys.stderr.write('\n') 116 sys.stderr.write('***********************************************\n') 117 sys.stderr.write('\n') 118 sys.stderr.write('warning sys.exit called before mpi4py main loop\n') 119 sys.stderr.write('\n') 120 sys.stderr.write('***********************************************\n') 121 sys.stderr.write('\n') 122 MPI.COMM_WORLD.Abort() 123 124 # Execution on the master. 125 else: 126 # Slave clean up. 127 if MPI.Is_initialized() and not MPI.Is_finalized() and MPI.COMM_WORLD.rank == 0: 128 # Send the exit command to all slaves. 129 self._broadcast_command(Exit_command()) 130 131 # Dump all results. 132 self._ditch_all_results() 133 134 # Exit the program with the given status. 135 sys.exit(status)
136 137
138 - def get_intro_string(self):
139 """Return the string to append to the end of the relax introduction string. 140 141 @return: The string describing this Processor fabric. 142 @rtype: str 143 """ 144 145 # Get the specific MPI version. 146 version_info = MPI.Get_version() 147 148 # The vendor info. 149 vendor = MPI.get_vendor() 150 vendor_name = vendor[0] 151 vendor_version = str(vendor[1][0]) 152 for i in range(1, len(vendor[1])): 153 vendor_version = vendor_version + '.%i' % vendor[1][i] 154 155 # Return the string. 156 return "MPI %s.%s running via mpi4py with %i slave processors & 1 master. Using %s %s." % (version_info[0], version_info[1], self.processor_size(), vendor_name, vendor_version)
157 158
159 - def get_name(self):
160 return '%s-pid%s' % (MPI.Get_processor_name(), os.getpid())
161 162
163 - def master_queue_command(self, command, dest):
164 """Slave to master processor data transfer - send the result command from the slave. 165 166 @param command: The results command to send to the master. 167 @type command: Results_command instance 168 @param dest: The destination processor's rank. 169 @type dest: int 170 """ 171 172 # Use a basic MPI send call to transfer the result command. 173 MPI.COMM_WORLD.send(obj=command, dest=dest)
174 175
176 - def master_receive_result(self):
177 """Slave to master processor data transfer - receive the result command from the slave. 178 179 This is invoked by the master processor. 180 181 @return: The result command sent by the slave. 182 @rtype: Result_command instance 183 """ 184 185 # Catch and return the result command. 186 return MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE)
187 188
189 - def rank(self):
190 return MPI.COMM_WORLD.rank
191 192
193 - def return_result_command(self, result_object):
194 MPI.COMM_WORLD.send(obj=result_object, dest=0)
195 196
197 - def run(self):
198 self.in_main_loop = True 199 super(Mpi4py_processor, self).run() 200 self.in_main_loop = False
201 202
203 - def slave_receive_commands(self):
204 return MPI.COMM_WORLD.recv(source=0)
205