Package multi :: Module mpi4py_processor
[hide private]
[frames] | no frames]

Source Code for Module multi.mpi4py_processor

  1  ############################################################################### 
  2  #                                                                             # 
  3  # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)    # 
  4  # Copyright (C) 2010-2012 Edward d'Auvergne                                   # 
  5  #                                                                             # 
  6  # This file is part of the program relax.                                     # 
  7  #                                                                             # 
  8  # relax is free software; you can redistribute it and/or modify               # 
  9  # it under the terms of the GNU General Public License as published by        # 
 10  # the Free Software Foundation; either version 2 of the License, or           # 
 11  # (at your option) any later version.                                         # 
 12  #                                                                             # 
 13  # relax is distributed in the hope that it will be useful,                    # 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of              # 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               # 
 16  # GNU General Public License for more details.                                # 
 17  #                                                                             # 
 18  # You should have received a copy of the GNU General Public License           # 
 19  # along with relax; if not, write to the Free Software                        # 
 20  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA   # 
 21  #                                                                             # 
 22  ############################################################################### 
 23   
 24  # Module docstring. 
 25  """The MPI processor fabric via the mpi4py Python implementation.""" 
 26   
 27   
 28  # TODO: clone communicators & resize 
 29  # TODO: check exceptions on master 
 30   
 31  # Python module imports. 
 32  try: 
 33      from mpi4py import MPI 
 34  except ImportError: 
 35      MPI = None 
 36  import os 
 37  import sys 
 38  import textwrap 
 39   
 40  # relax module imports. 
 41  from multi.slave_commands import Exit_command 
 42  from multi.multi_processor_base import Multi_processor, Too_few_slaves_exception 
 43   
 44   
45 -class Mpi4py_processor(Multi_processor):
46 """The mpi4py multi-processor class.""" 47
48 - def __init__(self, processor_size, callback):
49 """Initialise the mpi4py processor.""" 50 51 mpi_processor_size = MPI.COMM_WORLD.size-1 52 53 if processor_size == -1: 54 processor_size = mpi_processor_size 55 56 # FIXME: needs better support in relax generates stack trace 57 if mpi_processor_size == 0: 58 raise Too_few_slaves_exception() 59 60 msg = 'warning: mpi4py_processor is using 1 masters and %d slave processors you requested %d slaves\n' 61 if processor_size != (mpi_processor_size): 62 print(msg % (mpi_processor_size, processor_size)) 63 64 super(Mpi4py_processor, self).__init__(processor_size=mpi_processor_size, callback=callback) 65 66 # Initialise a flag for determining if we are in the run() method or not. 67 self.in_main_loop = False
68 69
70 - def _broadcast_command(self, command):
71 for i in range(1, MPI.COMM_WORLD.size): 72 if i != 0: 73 MPI.COMM_WORLD.send(obj=command, dest=i)
74 75
76 - def _ditch_all_results(self):
77 for i in range(1, MPI.COMM_WORLD.size): 78 if i != 0: 79 while True: 80 result = MPI.COMM_WORLD.recv(source=i) 81 if result.completed: 82 break
83 84
85 - def abort(self):
86 MPI.COMM_WORLD.Abort()
87 88
89 - def assert_on_master(self):
90 """Make sure that this is the master processor and not a slave. 91 92 @raises Exception: If not on the master processor. 93 """ 94 95 # Check if this processor is a slave, and if so throw an exception. 96 if self.on_slave(): 97 msg = 'running on slave when expected master with MPI.rank == 0, rank was %d'% self.rank() 98 raise Exception(msg)
99 100
101 - def exit(self, status=0):
102 """Exit the mpi4py processor with the given status. 103 104 @keyword status: The program exit status. 105 @type status: int 106 """ 107 108 # Execution on the slave. 109 if MPI.COMM_WORLD.rank != 0: 110 # Catch sys.exit being called on an executing slave. 111 if self.in_main_loop: 112 raise Exception('sys.exit unexpectedly called on slave!') 113 114 # Catch sys.exit 115 else: 116 sys.stderr.write('\n') 117 sys.stderr.write('***********************************************\n') 118 sys.stderr.write('\n') 119 sys.stderr.write('warning sys.exit called before mpi4py main loop\n') 120 sys.stderr.write('\n') 121 sys.stderr.write('***********************************************\n') 122 sys.stderr.write('\n') 123 MPI.COMM_WORLD.Abort() 124 125 # Execution on the master. 126 else: 127 # Slave clean up. 128 if MPI.Is_initialized() and not MPI.Is_finalized() and MPI.COMM_WORLD.rank == 0: 129 # Send the exit command to all slaves. 130 self._broadcast_command(Exit_command()) 131 132 # Dump all results. 133 self._ditch_all_results() 134 135 # Exit the program with the given status. 136 sys.exit(status)
137 138
139 - def get_intro_string(self):
140 """Return the string to append to the end of the relax introduction string. 141 142 @return: The string describing this Processor fabric. 143 @rtype: str 144 """ 145 146 # Get the specific MPI version. 147 version_info = MPI.Get_version() 148 149 # The vendor info. 150 vendor = MPI.get_vendor() 151 vendor_name = vendor[0] 152 vendor_version = str(vendor[1][0]) 153 for i in range(1, len(vendor[1])): 154 vendor_version = vendor_version + '.%i' % vendor[1][i] 155 156 # Return the string. 157 return "MPI %s.%s running via mpi4py with %i slave processors & 1 master. Using %s %s." % (version_info[0], version_info[1], self.processor_size(), vendor_name, vendor_version)
158 159
160 - def get_name(self):
161 return '%s-pid%s' % (MPI.Get_processor_name(), os.getpid())
162 163
164 - def master_queue_command(self, command, dest):
165 """Slave to master processor data transfer - send the result command from the slave. 166 167 @param command: The results command to send to the master. 168 @type command: Results_command instance 169 @param dest: The destination processor's rank. 170 @type dest: int 171 """ 172 173 # Use a basic MPI send call to transfer the result command. 174 MPI.COMM_WORLD.send(obj=command, dest=dest)
175 176
177 - def master_receive_result(self):
178 """Slave to master processor data transfer - receive the result command from the slave. 179 180 This is invoked by the master processor. 181 182 @return: The result command sent by the slave. 183 @rtype: Result_command instance 184 """ 185 186 # Catch and return the result command. 187 return MPI.COMM_WORLD.recv(source=MPI.ANY_SOURCE)
188 189
190 - def rank(self):
191 return MPI.COMM_WORLD.rank
192 193
194 - def return_result_command(self, result_object):
195 MPI.COMM_WORLD.send(obj=result_object, dest=0)
196 197
198 - def run(self):
199 self.in_main_loop = True 200 super(Mpi4py_processor, self).run() 201 self.in_main_loop = False
202 203
204 - def slave_receive_commands(self):
205 return MPI.COMM_WORLD.recv(source=0)
206