Package multi :: Module processor
[hide private]
[frames] | no frames]

Source Code for Module multi.processor

  1  ############################################################################### 
  2  #                                                                             # 
  3  # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)    # 
  4  # Copyright (C) 2011-2012 Edward d'Auvergne                                   # 
  5  #                                                                             # 
  6  # This file is part of the program relax.                                     # 
  7  #                                                                             # 
  8  # relax is free software; you can redistribute it and/or modify               # 
  9  # it under the terms of the GNU General Public License as published by        # 
 10  # the Free Software Foundation; either version 2 of the License, or           # 
 11  # (at your option) any later version.                                         # 
 12  #                                                                             # 
 13  # relax is distributed in the hope that it will be useful,                    # 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of              # 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               # 
 16  # GNU General Public License for more details.                                # 
 17  #                                                                             # 
 18  # You should have received a copy of the GNU General Public License           # 
 19  # along with relax; if not, write to the Free Software                        # 
 20  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA   # 
 21  #                                                                             # 
 22  ############################################################################### 
 23   
 24  # Module docstring. 
 25  """The processor class is the central class in the multi python multiprocessor framework. 
 26   
 27  Overview 
 28  ======== 
 29   
 30  The framework has two main responsibilities: 
 31   
 32       1. Process management - if needed the processor can create the slave processes it manages if 
 33          they haven't been created by the operating system. It is also responsible for reporting 
 34          exceptions and shutting down the multiprocessor in the face of errors. 
 35       2. Scheduling commands on the slave processors via an interprocess communication fabric (MPI, 
 36          PVM, threads etc) and processing returned text and result commands. 
 37   
 38   
 39  Using the processor framework 
 40  ============================= 
 41   
 42  Users of the processor framework will typically use the following methodology: 
 43   
 44       1. At application startup determine the name of the required processor implementation and the number of slave processors requested. 
 45   
 46       2. Create an Application_callback object.  For example: 
 47              relax_instance = Relax() 
 48              callbacks = Application_callback(master=relax_instance) 
 49   
 50       3. Dynamically load a processor implementation using the name of the processor and the number of required slave processors.  For example: 
 51              processor = Processor.load_multiprocessor(relax_instance.multiprocessor_type, callbacks, processor_size=relax_instance.n_processors) 
 52   
 53       4. Call run on the processor instance returned above and handle all Exceptions.  For example: 
 54              processor.run() 
 55   
 56       5. After calling run, the processor will call back to Application_callback.init_master from which you should call you main program (Application_callback defaults to self.master.run()). 
 57   
 58       6. Once in the main program you should call processor.add_to_queue with a series of multi.Slave_command objects you wish to be run across the slave processor pool and then call processor.run_queue to actually execute the commands remotely while blocking. 
 59          >>> 
 60          example here... 
 61   
 62       7. Processor.Slave_commands will then run remotely on the slaves and any thrown exceptions and processor.result_commands queued to processor.return_object will be returned to the master processor and handled or executed. The slave processors also provide facilities for capturing the STDERR and STDOUT streams and returning their contents as strings for display on the master's STDOUT and STDERR streams (***more**?). 
 63   
 64   
 65  Extending the processor framework with a new interprocess communication fabric 
 66  ============================================================================== 
 67   
 68  The processor class acts as a base class that defines all the commands that a processor implementing 
 69  a new inter processor communication fabric needs. All that is required is to implement a subclass of 
 70  processor providing the required methods (of course as python provides dynamic typing and 
 71  polymorphism 'duck typing' you can always implement a class with the same set of method and it will 
 72  also work). Currently processor classes are loaded from the processor module and are modules with 
 73  names of the form: 
 74   
 75  >>> multi.<type>_processor.<Type>_processor 
 76   
 77  where <Type> is the name of the processor with the correct capitalisation e.g. 
 78   
 79  >>> processor_name = 'mpi4py' 
 80  >>> callback = My_application-callback() 
 81  >>> proccesor_size = 6 
 82  >>> processor.load_multiprocessor(processor_name, callback, processor_size) 
 83   
 84  will load multi.mpi4py_processor.Mpi4py_Processor. 
 85   
 86   
 87  TODO 
 88  ==== 
 89   
 90  The following are yet to be implemented: 
 91   
 92      1. There is no ability of the processor to request command line arguments. 
 93   
 94      2. The processor can't currently be loaded from somewhere other than the multi directory. 
 95   
 96  """ 
 97   
 98  #FIXME: better requirement of inherited commands. 
 99  #TODO: check exceptions on master. 
100   
101  # Python module imports. 
102  import time, datetime, math, sys 
103   
104  # multi module imports. 
105  from multi.misc import Capturing_exception, raise_unimplemented, Verbosity; verbosity = Verbosity() 
106  from multi.result_queue import Threaded_result_queue 
107  from multi.processor_io import Redirect_text 
108  from multi.result_commands import Batched_result_command, Null_result_command, Result_exception 
109  from multi.slave_commands import Slave_storage_command 
110   
111   
112 -class Data_store:
113 """A special Processor specific data storage container."""
114 115
116 -class Processor(object):
117 """The central class of the multi processor framework. 118 119 This provides facilities for process management, command queueing, command scheduling, remote 120 execution of commands, and handling of results and error from commands. The class is abstract 121 and should be overridden to implement new interprocess communication methods, however, even then 122 users are encouraged to override the more full implemented multi.multi_processor.Multi_processor 123 class. Most users should instantiate instances of this class by calling the static method 124 Processor.load_multiprocessor. 125 126 The class is designed to be subclassed and has abstract methods that a subclass needs to 127 override. Methods which can be overridden are clearly marked with a note annotation stating that 128 they can be overridden. 129 130 @todo: It maybe a good idea to separate out the features of the class that purely deal with the 131 interprocess communication fabric. 132 @todo: The processor can't currently harvest the required command line arguments from the 133 current command line. 134 """ 135 136
137 - def __init__(self, processor_size, callback):
138 """Initialise the processor. 139 140 @param processor_size: The requested number of __slave__processors, if the number of 141 processors is set by the environment (e.g. in the case of MPI via 142 mpiexec -np <n-processors> on the command line the processor is free 143 free to ignore this value. The default value from the command line 144 is -1, and subclasses on receiving this value either raise and 145 exception or determine the correct number of slaves to create (e.g. 146 on a multi-cored machine using a threaded implementation the correct 147 number of slaves would be equal to the number of cores available). 148 @type processor_size: int 149 @param callback: The application callback which allows the host application to start 150 its main loop and handle exceptions from the processor. 151 @type callback: multi.processor.Application_callback instance 152 """ 153 154 self.callback = callback 155 """Callback to interface to the host application 156 157 @see: Application_callback.""" 158 159 self.grainyness = 1 160 """The number of sub jobs to queue for each processor if we have more jobs than processors.""" 161 162 # # CHECKME: am I implemented?, should I be an application callback function 163 # self.pre_queue_command = None 164 # """ command to call before the queue is run""" 165 # # CHECKME: am I implemented?, should I be an application callback function 166 # self.post_queue_command = None 167 # """ command to call after the queue has completed running""" 168 # 169 #CHECKME: should I be a singleton 170 self.NULL_RESULT = Null_result_command(processor=self) 171 """Empty result command used by commands which do not return a result (a singleton?).""" 172 173 # Initialise the processor specific data store. 174 self.data_store = Data_store() 175 """The processor data store.""" 176 177 self._processor_size = processor_size 178 """Number of slave processors available in this processor.""" 179 180 self.threaded_result_processing = True 181 """Flag for the handling of result processing via self.run_command_queue()."""
182 183
184 - def abort(self):
185 """Shutdown the multi processor in exceptional conditions - designed for overriding. 186 187 This method is called after an exception from the master or slave has been raised and processed and is responsible for the shutdown of the multi processor fabric and terminating the application. The functions should be called as the last thing that Application_callback.handle_exception does. 188 189 As an example of the methods use see Mpi4py_processor.abort which calls MPI.COMM_WORLD.Abort() to cleanly shutdown the mpi framework and remove dangling processes. 190 191 The default action is to call the special self.exit() method. 192 193 @see: multi.processor.Application_callback. 194 @see: multi.mpi4py_processor.Mpi4py_processor.abort(). 195 @see: mpi4py.MPI.COMM_WORLD.Abort(). 196 """ 197 198 self.exit()
199 200
201 - def add_to_queue(self, command, memo=None):
202 """Add a command for remote execution to the queue - an abstract method. 203 204 @see: multi.processor.Slave_command 205 @see: multi.processor.Result_command 206 @see: multi.processor.Memo 207 208 @param command: A command to execute on a slave processor. 209 @type command: ? subclass instance 210 @keyword memo: A place to place data needed on command completion (e.g. where to save the 211 results) the data stored in the memo is provided to Result_commands 212 generated by the command submitted. 213 @type memo: Memo subclass instance 214 """ 215 216 raise_unimplemented(self.add_to_queue)
217 218
219 - def assert_on_master(self):
220 """Make sure that this is the master processor and not a slave. 221 222 @raises Exception: If not on the master processor. 223 """ 224 225 raise_unimplemented(self.assert_on_master)
226 227
228 - def exit(self, status=0):
229 """Exit the processor with the given status. 230 231 This default method allows the program to drop off the end and terminate as it normally would - i.e. this method does nothing. 232 233 @keyword status: The program exit status. 234 @type status: int 235 """
236 237
238 - def fetch_data(self, name=None):
239 """Fetch the data structure of the given name from the data store. 240 241 This can be run on the master or slave processors. 242 243 244 @keyword name: The name of the data structure to fetch. 245 @type name: str 246 @return: The value of the associated data structure. 247 @rtype: anything 248 """ 249 250 # Get the object. 251 obj = getattr(self.data_store, name) 252 253 # Return the value. 254 return obj
255 256
257 - def get_intro_string(self):
258 """Get a string describing the multi processor - designed for overriding. 259 260 The string should be suitable for display at application startup and should be less than 100 261 characters wide. A good example is the string returned by mpi4py_processor: 262 263 >>> MPI running via mpi4py with <n> slave processors & 1 master, mpi version = <x>.<y> 264 265 @see: multi.processor.mpi4py_processor.Mpi4py_processor.get_intro_string. 266 267 @return: A string describing the multi processor. 268 @rtype: str 269 """ 270 271 raise_unimplemented(self.get_intro_string)
272 273
274 - def get_name(self):
275 """Get the name of the current processor - an abstract method. 276 277 The string should identify the current master or slave processor uniquely but is purely for 278 information and debugging. For example the mpi implementation uses the string 279 <host-name>-<process-id> whereas the thread implementation uses the id of the current thread 280 as provided by python. 281 282 @return: The processor identifier. 283 @rtype: str 284 """ 285 286 raise_unimplemented(self.get_name)
287 288
289 - def get_stdio_pre_strings(self):
290 """Get the strings used prepend STDOUT and STDERR dependant on the current rank. 291 292 For processors with only one slave the result should be ('', '') - designed for overriding. 293 294 @note: The defaults are ('M S|', 'M E|') and ('NN S|' , 'NN E|') for masters and slaves 295 respectively with NN replaced by the rank of the processor. 296 297 @return: A list of two strings for prepending to each line of STDOUT and STDERR. 298 @rtype: list of 2 str 299 """ 300 301 # Only prepend test if the verbosity level is set. 302 if not verbosity.level(): 303 return '', '' 304 305 # Initialise. 306 pre_string = '' 307 stdout_string = '' 308 stderr_string = '' 309 rank = self.rank() 310 311 # Start of the slave string. 312 if self.processor_size() > 1 and rank > 0: 313 pre_string = self.rank_format_string() % rank 314 315 # Start of the master string. 316 elif self.processor_size() > 1 and rank == 0: 317 pre_string = 'M'*self.rank_format_string_width() 318 319 # For multi-processors, the STDOUT and STDERR indicators, and the separator. 320 if self.processor_size() > 1: 321 stderr_string = pre_string + ' E| ' 322 stdout_string = pre_string + ' | ' 323 324 # Return the strings to prepend to the STDOUT and STDERR streams. 325 return stdout_string, stderr_string
326 327
328 - def get_time_delta(self, start_time, end_time):
329 """Utility function called to format the difference between application start and end times. 330 331 @todo: Check my format is correct. 332 333 @param start_time: The time the application started in seconds since the epoch. 334 @type start_time: float 335 @param end_time: The time the application ended in seconds since the epoch. 336 @type end_time: float 337 @return: The time difference in the format 'hours:minutes:seconds'. 338 @rtype: str 339 """ 340 341 time_diff = end_time - start_time 342 time_delta = datetime.timedelta(seconds=time_diff) 343 time_delta_str = time_delta.__str__() 344 (time_delta_str, millis) = time_delta_str.split('.', 1) 345 return time_delta_str
346 347
348 - def master_queue_command(self, command, dest):
349 """Slave to master processor data transfer - send the result command from the slave. 350 351 This is invoked by the slave processor. 352 353 354 @param command: The results command to send to the master. 355 @type command: Results_command instance 356 @param dest: The destination processor's rank. 357 @type dest: int 358 """ 359 360 raise_unimplemented(self.master_queue_command)
361 362
363 - def master_receive_result(self):
364 """Slave to master processor data transfer - receive the result command from the slave. 365 366 This is invoked by the master processor. 367 368 @return: The result command sent by the slave. 369 @rtype: Result_command instance 370 """ 371 372 raise_unimplemented(self.master_receive_result)
373 374
375 - def post_run(self):
376 """Method called after the application main loop has finished - designed for overriding. 377 378 The default implementation outputs the application runtime to STDOUT. All subclasses should 379 call the base method as their last action via super(). Only called on the master on normal 380 exit from the applications run loop. 381 """ 382 383 if self.rank() == 0: 384 end_time = time.time() 385 time_delta_str = self.get_time_delta(self.start_time, end_time) 386 387 # Print out of the total run time. 388 if verbosity.level(): 389 print('\nOverall runtime: ' + time_delta_str + '\n')
390 391
392 - def pre_run(self):
393 """Method called before starting the application main loop - designed for overriding. 394 395 The default implementation just saves the start time for application timing. All subclasses 396 should call the base method via super(). Only called on the master. 397 """ 398 399 if self.rank() == 0: 400 self.start_time = time.time()
401 402
403 - def processor_size(self):
404 """Get the number of slave processors - designed for overriding. 405 406 @return: The number of slave processors. 407 @rtype: int 408 """ 409 410 return self._processor_size
411 412
413 - def rank(self):
414 """Get the rank of this processor - an abstract method. 415 416 The rank of the processor should be a number between 0 and n where n is the number of slave 417 processors, the rank of 0 is reserved for the master processor. 418 419 @return: The rank of the processor. 420 @rtype: int 421 """ 422 423 raise_unimplemented(self.rank)
424 425
426 - def rank_format_string(self):
427 """Get a formatted string with the rank of a slave. 428 429 Only called on slaves. 430 431 @return: The string designating the rank of the slave. 432 @rtype: str 433 """ 434 435 digits = self.rank_format_string_width() 436 format = '%%%di' % digits 437 return format
438 439
440 - def rank_format_string_width(self):
441 """Get the width of the string designating the rank of a slave process. 442 443 Typically this will be the number of digits in the slaves rank. 444 445 @return: The number of digits in the biggest slave processor's rank. 446 @rtype: int 447 """ 448 449 return int(math.ceil(math.log10(self.processor_size())))
450 451
452 - def return_object(self, result):
453 """Return a result to the master processor from a slave - an abstract method. 454 455 @param result: A result to be returned to the master processor. 456 @type result: Result_string, Result_command or Exception instance 457 458 @see: multi.processor.Result_string. 459 @see: multi.processor.Resulf_command. 460 """ 461 462 raise_unimplemented(self.return_object)
463 464
465 - def run(self):
466 """Run the processor - an abstract method. 467 468 This function runs the processor main loop and is called after all processor setup has been completed. It does remote execution setup and teardown (via self.pre_run() and self.post_run()) round either side of a call to Application_callback.init_master. 469 470 @see: multi.processor.Application_callback. 471 """ 472 473 # Execute any setup code needed for the specific processor fabrics. 474 self.pre_run() 475 476 # Execution of the master processor. 477 if self.on_master(): 478 # Execute the program's run() method, as specified by the Application_callback. 479 try: 480 self.callback.init_master(self) 481 482 # Allow sys.exit() calls. 483 except SystemExit: 484 # Allow the processor fabric to clean up. 485 self.exit() 486 487 # Continue with the sys.exit(). 488 raise 489 490 # Handle all errors nicely. 491 except Exception, e: 492 self.callback.handle_exception(self, e) 493 494 # Execution of the slave processor. 495 else: 496 # Loop until the slave is asked to die via an Exit_command setting the do_quit flag. 497 while not self.do_quit: 498 # Execute the slave by catching commands, catching all exceptions. 499 try: 500 # Fetch any commands on the queue. 501 commands = self.slave_receive_commands() 502 503 # Convert to a list, if needed. 504 if not isinstance(commands, list): 505 commands = [commands] 506 507 # Initialise the results list. 508 if self.batched_returns: 509 self.result_list = [] 510 else: 511 self.result_list = None 512 513 # Execute each command, one by one. 514 for i, command in enumerate(commands): 515 # Capture the standard IO streams for the slaves. 516 self.stdio_capture() 517 518 # Set the completed flag if this is the last command. 519 completed = (i == len(commands)-1) 520 521 # Execute the calculation. 522 command.run(self, completed) 523 524 # Restore the IO. 525 self.stdio_restore() 526 527 # Process the batched results. 528 if self.batched_returns: 529 self.return_object(Batched_result_command(processor=self, result_commands=self.result_list, io_data=self.io_data)) 530 self.result_list = None 531 532 # Capture and process all slave exceptions. 533 except: 534 capturing_exception = Capturing_exception(rank=self.rank(), name=self.get_name()) 535 exception_result = Result_exception(exception=capturing_exception, processor=self, completed=True) 536 537 self.return_object(exception_result) 538 self.result_list = None 539 540 # Execute any tear down code needed for the specific processor fabrics. 541 self.post_run() 542 543 # End of execution, so perform any exiting actions needed by the specific processor fabrics. 544 if self.on_master(): 545 self.exit()
546 547
548 - def run_command_globally(self, command):
549 """Run the same command on all slave processors. 550 551 @see: multi.processor.processor.Slave_command. 552 553 @param command: A slave command. 554 @type command: Slave_command instance 555 """ 556 557 queue = [command for i in range(self.processor_size())] 558 self.run_command_queue(queue)
559 560
561 - def run_command_queue(self, queue):
562 """Process all commands on the queue and wait for completion. 563 564 @param queue: The command queue. 565 @type queue: list of Command instances 566 """ 567 568 # This must only be run on the master processor. 569 self.assert_on_master() 570 571 running_set = set() 572 idle_set = set([i for i in range(1, self.processor_size()+1)]) 573 574 if self.threaded_result_processing: 575 result_queue = Threaded_result_queue(self) 576 else: 577 result_queue = Immediate_result_queue(self) 578 579 while len(queue) != 0: 580 581 while len(idle_set) != 0: 582 if len(queue) != 0: 583 command = queue.pop() 584 dest = idle_set.pop() 585 self.master_queue_command(command=command, dest=dest) 586 running_set.add(dest) 587 else: 588 break 589 590 # Loop until the queue of calculations is depleted. 591 while len(running_set) != 0: 592 # Get the result. 593 result = self.master_receive_result() 594 595 # Debugging print out. 596 if verbosity.level(): 597 print('\nIdle set: %s' % idle_set) 598 print('Running set: %s' % running_set) 599 600 # Shift the processor rank to the idle set. 601 if result.completed: 602 idle_set.add(result.rank) 603 running_set.remove(result.rank) 604 605 # Add to the result queue for instant or threaded processing. 606 result_queue.put(result) 607 608 # Process the threaded results. 609 if self.threaded_result_processing: 610 result_queue.run_all()
611 612
613 - def run_queue(self):
614 """Run the processor queue - an abstract method. 615 616 All commands queued with add_to_queue will be executed, this function causes the current 617 thread to block until the command has completed. 618 """ 619 620 #FIXME: need a finally here to cleanup exceptions states 621 lqueue = self.chunk_queue(self.command_queue) 622 self.run_command_queue(lqueue) 623 624 del self.command_queue[:] 625 self.memo_map.clear()
626 627
628 - def send_data_to_slaves(self, name=None, value=None):
629 """Transfer the given data from the master to all slaves. 630 631 @keyword name: The name of the data structure to store. 632 @type name: str 633 @keyword value: The data structure. 634 @type value: anything 635 """ 636 637 # This must be the master processor! 638 self.assert_on_master() 639 640 # Create the command list. 641 for i in range(self.processor_size()): 642 # Create and append the command. 643 command = Slave_storage_command() 644 645 # Add the data to the command. 646 command.add(name, value) 647 648 # Add the command to the queue. 649 self.add_to_queue(command) 650 651 # Flush the queue. 652 self.run_queue()
653 654
655 - def stdio_capture(self):
656 """Enable capture of the STDOUT and STDERR. 657 658 This is currently used to capture the IO streams of the slaves to return back to the master. 659 """ 660 661 # Store the original STDOUT and STDERR for restoring later on. 662 self.orig_stdout = sys.stdout 663 self.orig_stderr = sys.stderr 664 665 # The data object. 666 self.io_data = [] 667 668 # Get the strings to prepend to the IO streams. 669 pre_strings = self.get_stdio_pre_strings() 670 671 # Then redirect IO. 672 sys.stdout = Redirect_text(self.io_data, token=pre_strings[0], stream=0) 673 sys.stderr = Redirect_text(self.io_data, token=pre_strings[1], stream=1)
674 675
676 - def stdio_restore(self):
677 """Restore the original STDOUT and STDERR streams.""" 678 679 # Restore the original streams. 680 sys.stdout = self.orig_stdout 681 sys.stderr = self.orig_stderr
682