Package multi :: Module processor
[hide private]
[frames] | no frames]

Source Code for Module multi.processor

  1  ############################################################################### 
  2  #                                                                             # 
  3  # Copyright (C) 2007 Gary S Thompson                                          # 
  4  # Copyright (C) 2008,2010-2013 Edward d'Auvergne                              # 
  5  #                                                                             # 
  6  # This file is part of the program relax (          # 
  7  #                                                                             # 
  8  # This program is free software: you can redistribute it and/or modify        # 
  9  # it under the terms of the GNU General Public License as published by        # 
 10  # the Free Software Foundation, either version 3 of the License, or           # 
 11  # (at your option) any later version.                                         # 
 12  #                                                                             # 
 13  # This program is distributed in the hope that it will be useful,             # 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of              # 
 16  # GNU General Public License for more details.                                # 
 17  #                                                                             # 
 18  # You should have received a copy of the GNU General Public License           # 
 19  # along with this program.  If not, see <>.       # 
 20  #                                                                             # 
 21  ############################################################################### 
 23  # Module docstring. 
 24  """The processor class is the central class in the multi python multiprocessor framework. 
 26  Overview 
 27  ======== 
 29  The framework has two main responsibilities: 
 31       1. Process management - if needed the processor can create the slave processes it manages if 
 32          they haven't been created by the operating system. It is also responsible for reporting 
 33          exceptions and shutting down the multiprocessor in the face of errors. 
 34       2. Scheduling commands on the slave processors via an interprocess communication fabric (MPI, 
 35          PVM, threads etc) and processing returned text and result commands. 
 38  Using the processor framework 
 39  ============================= 
 41  Users of the processor framework will typically use the following methodology: 
 43       1. At application startup determine the name of the required processor implementation and the number of slave processors requested. 
 45       2. Create an Application_callback object.  For example: 
 46              relax_instance = Relax() 
 47              callbacks = Application_callback(master=relax_instance) 
 49       3. Dynamically load a processor implementation using the name of the processor and the number of required slave processors.  For example: 
 50              processor = Processor.load_multiprocessor(relax_instance.multiprocessor_type, callbacks, processor_size=relax_instance.n_processors) 
 52       4. Call run on the processor instance returned above and handle all Exceptions.  For example: 
 55       5. After calling run, the processor will call back to Application_callback.init_master from which you should call you main program (Application_callback defaults to 
 57       6. Once in the main program you should call processor.add_to_queue with a series of multi.Slave_command objects you wish to be run across the slave processor pool and then call processor.run_queue to actually execute the commands remotely while blocking. 
 58          >>> 
 59          example here... 
 61       7. Processor.Slave_commands will then run remotely on the slaves and any thrown exceptions and processor.result_commands queued to processor.return_object will be returned to the master processor and handled or executed. The slave processors also provide facilities for capturing the STDERR and STDOUT streams and returning their contents as strings for display on the master's STDOUT and STDERR streams (***more**?). 
 64  Extending the processor framework with a new interprocess communication fabric 
 65  ============================================================================== 
 67  The processor class acts as a base class that defines all the commands that a processor implementing 
 68  a new inter processor communication fabric needs. All that is required is to implement a subclass of 
 69  processor providing the required methods (of course as python provides dynamic typing and 
 70  polymorphism 'duck typing' you can always implement a class with the same set of method and it will 
 71  also work). Currently processor classes are loaded from the processor module and are modules with 
 72  names of the form: 
 74  >>> multi.<type>_processor.<Type>_processor 
 76  where <Type> is the name of the processor with the correct capitalisation e.g. 
 78  >>> processor_name = 'mpi4py' 
 79  >>> callback = My_application-callback() 
 80  >>> proccesor_size = 6 
 81  >>> processor.load_multiprocessor(processor_name, callback, processor_size) 
 83  will load multi.mpi4py_processor.Mpi4py_Processor. 
 86  TODO 
 87  ==== 
 89  The following are yet to be implemented: 
 91      1. There is no ability of the processor to request command line arguments. 
 93      2. The processor can't currently be loaded from somewhere other than the multi directory. 
 95  """ 
 97  #FIXME: better requirement of inherited commands. 
 98  #TODO: check exceptions on master. 
100  # Python module imports. 
101  import time, datetime, math, sys 
103  # multi module imports. 
104  from multi.misc import Capturing_exception, raise_unimplemented, Verbosity; verbosity = Verbosity() 
105  from multi.result_queue import Threaded_result_queue 
106  from multi.processor_io import Redirect_text 
107  from multi.result_commands import Batched_result_command, Null_result_command, Result_exception 
108  from multi.slave_commands import Slave_storage_command 
111 -class Data_store:
112 """A special Processor specific data storage container."""
113 114
115 -class Processor(object):
116 """The central class of the multi processor framework. 117 118 This provides facilities for process management, command queueing, command scheduling, remote 119 execution of commands, and handling of results and error from commands. The class is abstract 120 and should be overridden to implement new interprocess communication methods, however, even then 121 users are encouraged to override the more full implemented multi.multi_processor.Multi_processor 122 class. Most users should instantiate instances of this class by calling the static method 123 Processor.load_multiprocessor. 124 125 The class is designed to be subclassed and has abstract methods that a subclass needs to 126 override. Methods which can be overridden are clearly marked with a note annotation stating that 127 they can be overridden. 128 129 @todo: It maybe a good idea to separate out the features of the class that purely deal with the 130 interprocess communication fabric. 131 @todo: The processor can't currently harvest the required command line arguments from the 132 current command line. 133 """ 134 135
136 - def __init__(self, processor_size, callback):
137 """Initialise the processor. 138 139 @param processor_size: The requested number of __slave__processors, if the number of 140 processors is set by the environment (e.g. in the case of MPI via 141 mpiexec -np <n-processors> on the command line the processor is free 142 free to ignore this value. The default value from the command line 143 is -1, and subclasses on receiving this value either raise and 144 exception or determine the correct number of slaves to create (e.g. 145 on a multi-cored machine using a threaded implementation the correct 146 number of slaves would be equal to the number of cores available). 147 @type processor_size: int 148 @param callback: The application callback which allows the host application to start 149 its main loop and handle exceptions from the processor. 150 @type callback: multi.processor.Application_callback instance 151 """ 152 153 self.callback = callback 154 """Callback to interface to the host application 155 156 @see: Application_callback.""" 157 158 self.grainyness = 1 159 """The number of sub jobs to queue for each processor if we have more jobs than processors.""" 160 161 # # CHECKME: am I implemented?, should I be an application callback function 162 # self.pre_queue_command = None 163 # """ command to call before the queue is run""" 164 # # CHECKME: am I implemented?, should I be an application callback function 165 # self.post_queue_command = None 166 # """ command to call after the queue has completed running""" 167 # 168 #CHECKME: should I be a singleton 169 self.NULL_RESULT = Null_result_command(processor=self) 170 """Empty result command used by commands which do not return a result (a singleton?).""" 171 172 # Initialise the processor specific data store. 173 self.data_store = Data_store() 174 """The processor data store.""" 175 176 self._processor_size = processor_size 177 """Number of slave processors available in this processor.""" 178 179 self.threaded_result_processing = True 180 """Flag for the handling of result processing via self.run_command_queue()."""
181 182
183 - def abort(self):
184 """Shutdown the multi processor in exceptional conditions - designed for overriding. 185 186 This method is called after an exception from the master or slave has been raised and processed and is responsible for the shutdown of the multi processor fabric and terminating the application. The functions should be called as the last thing that Application_callback.handle_exception does. 187 188 As an example of the methods use see Mpi4py_processor.abort which calls MPI.COMM_WORLD.Abort() to cleanly shutdown the mpi framework and remove dangling processes. 189 190 The default action is to call the special self.exit() method. 191 192 @see: multi.processor.Application_callback. 193 @see: multi.mpi4py_processor.Mpi4py_processor.abort(). 194 @see: mpi4py.MPI.COMM_WORLD.Abort(). 195 """ 196 197 self.exit()
198 199
200 - def add_to_queue(self, command, memo=None):
201 """Add a command for remote execution to the queue - an abstract method. 202 203 @see: multi.processor.Slave_command 204 @see: multi.processor.Result_command 205 @see: multi.processor.Memo 206 207 @param command: A command to execute on a slave processor. 208 @type command: ? subclass instance 209 @keyword memo: A place to place data needed on command completion (e.g. where to save the 210 results) the data stored in the memo is provided to Result_commands 211 generated by the command submitted. 212 @type memo: Memo subclass instance 213 """ 214 215 raise_unimplemented(self.add_to_queue)
216 217
218 - def assert_on_master(self):
219 """Make sure that this is the master processor and not a slave. 220 221 @raises Exception: If not on the master processor. 222 """ 223 224 raise_unimplemented(self.assert_on_master)
225 226
227 - def exit(self, status=0):
228 """Exit the processor with the given status. 229 230 This default method allows the program to drop off the end and terminate as it normally would - i.e. this method does nothing. 231 232 @keyword status: The program exit status. 233 @type status: int 234 """
235 236
237 - def fetch_data(self, name=None):
238 """Fetch the data structure of the given name from the data store. 239 240 This can be run on the master or slave processors. 241 242 243 @keyword name: The name of the data structure to fetch. 244 @type name: str 245 @return: The value of the associated data structure. 246 @rtype: anything 247 """ 248 249 # Get the object. 250 obj = getattr(self.data_store, name) 251 252 # Return the value. 253 return obj
254 255
256 - def get_intro_string(self):
257 """Get a string describing the multi processor - designed for overriding. 258 259 The string should be suitable for display at application startup and should be less than 100 260 characters wide. A good example is the string returned by mpi4py_processor: 261 262 >>> MPI running via mpi4py with <n> slave processors & 1 master, mpi version = <x>.<y> 263 264 @see: multi.processor.mpi4py_processor.Mpi4py_processor.get_intro_string. 265 266 @return: A string describing the multi processor. 267 @rtype: str 268 """ 269 270 raise_unimplemented(self.get_intro_string)
271 272
273 - def get_name(self):
274 """Get the name of the current processor - an abstract method. 275 276 The string should identify the current master or slave processor uniquely but is purely for 277 information and debugging. For example the mpi implementation uses the string 278 <host-name>-<process-id> whereas the thread implementation uses the id of the current thread 279 as provided by python. 280 281 @return: The processor identifier. 282 @rtype: str 283 """ 284 285 raise_unimplemented(self.get_name)
286 287
288 - def get_stdio_pre_strings(self):
289 """Get the strings used prepend STDOUT and STDERR dependant on the current rank. 290 291 For processors with only one slave the result should be ('', '') - designed for overriding. 292 293 @note: The defaults are ('M S|', 'M E|') and ('NN S|' , 'NN E|') for masters and slaves 294 respectively with NN replaced by the rank of the processor. 295 296 @return: A list of two strings for prepending to each line of STDOUT and STDERR. 297 @rtype: list of 2 str 298 """ 299 300 # Only prepend test if the verbosity level is set. 301 if not verbosity.level(): 302 return '', '' 303 304 # Initialise. 305 pre_string = '' 306 stdout_string = '' 307 stderr_string = '' 308 rank = self.rank() 309 310 # Start of the slave string. 311 if self.processor_size() > 1 and rank > 0: 312 pre_string = self.rank_format_string() % rank 313 314 # Start of the master string. 315 elif self.processor_size() > 1 and rank == 0: 316 pre_string = 'M'*self.rank_format_string_width() 317 318 # For multi-processors, the STDOUT and STDERR indicators, and the separator. 319 if self.processor_size() > 1: 320 stderr_string = pre_string + ' E| ' 321 stdout_string = pre_string + ' | ' 322 323 # Return the strings to prepend to the STDOUT and STDERR streams. 324 return stdout_string, stderr_string
325 326
327 - def get_time_delta(self, start_time, end_time):
328 """Utility function called to format the difference between application start and end times. 329 330 @todo: Check my format is correct. 331 332 @param start_time: The time the application started in seconds since the epoch. 333 @type start_time: float 334 @param end_time: The time the application ended in seconds since the epoch. 335 @type end_time: float 336 @return: The time difference in the format 'hours:minutes:seconds'. 337 @rtype: str 338 """ 339 340 time_diff = end_time - start_time 341 time_delta = datetime.timedelta(seconds=time_diff) 342 time_delta_str = time_delta.__str__() 343 (time_delta_str, millis) = time_delta_str.split('.', 1) 344 return time_delta_str
345 346
347 - def is_queued(self):
348 """Determine if any slave commands are queued. 349 350 @return: True if slave commands are in the queue, False otherwise. 351 @rtype: bool 352 """ 353 354 # The standard command queue. 355 if len(self.command_queue): 356 return True 357 358 # Nothing. 359 return False
360 361
362 - def master_queue_command(self, command, dest):
363 """Slave to master processor data transfer - send the result command from the slave. 364 365 This is invoked by the slave processor. 366 367 368 @param command: The results command to send to the master. 369 @type command: Results_command instance 370 @param dest: The destination processor's rank. 371 @type dest: int 372 """ 373 374 raise_unimplemented(self.master_queue_command)
375 376
377 - def master_receive_result(self):
378 """Slave to master processor data transfer - receive the result command from the slave. 379 380 This is invoked by the master processor. 381 382 @return: The result command sent by the slave. 383 @rtype: Result_command instance 384 """ 385 386 raise_unimplemented(self.master_receive_result)
387 388
389 - def post_run(self):
390 """Method called after the application main loop has finished - designed for overriding. 391 392 The default implementation outputs the application runtime to STDOUT. All subclasses should 393 call the base method as their last action via super(). Only called on the master on normal 394 exit from the applications run loop. 395 """ 396 397 if self.rank() == 0: 398 end_time = time.time() 399 time_delta_str = self.get_time_delta(self.start_time, end_time) 400 401 # Print out of the total run time. 402 if verbosity.level(): 403 print('\nOverall runtime: ' + time_delta_str + '\n')
404 405
406 - def pre_run(self):
407 """Method called before starting the application main loop - designed for overriding. 408 409 The default implementation just saves the start time for application timing. All subclasses 410 should call the base method via super(). Only called on the master. 411 """ 412 413 if self.rank() == 0: 414 self.start_time = time.time()
415 416
417 - def processor_size(self):
418 """Get the number of slave processors - designed for overriding. 419 420 @return: The number of slave processors. 421 @rtype: int 422 """ 423 424 return self._processor_size
425 426
427 - def rank(self):
428 """Get the rank of this processor - an abstract method. 429 430 The rank of the processor should be a number between 0 and n where n is the number of slave 431 processors, the rank of 0 is reserved for the master processor. 432 433 @return: The rank of the processor. 434 @rtype: int 435 """ 436 437 raise_unimplemented(self.rank)
438 439
440 - def rank_format_string(self):
441 """Get a formatted string with the rank of a slave. 442 443 Only called on slaves. 444 445 @return: The string designating the rank of the slave. 446 @rtype: str 447 """ 448 449 digits = self.rank_format_string_width() 450 format = '%%%di' % digits 451 return format
452 453
454 - def rank_format_string_width(self):
455 """Get the width of the string designating the rank of a slave process. 456 457 Typically this will be the number of digits in the slaves rank. 458 459 @return: The number of digits in the biggest slave processor's rank. 460 @rtype: int 461 """ 462 463 return int(math.ceil(math.log10(self.processor_size())))
464 465
466 - def return_object(self, result):
467 """Return a result to the master processor from a slave - an abstract method. 468 469 @param result: A result to be returned to the master processor. 470 @type result: Result_string, Result_command or Exception instance 471 472 @see: multi.processor.Result_string. 473 @see: multi.processor.Resulf_command. 474 """ 475 476 raise_unimplemented(self.return_object)
477 478
479 - def run(self):
480 """Run the processor - an abstract method. 481 482 This function runs the processor main loop and is called after all processor setup has been completed. It does remote execution setup and teardown (via self.pre_run() and self.post_run()) round either side of a call to Application_callback.init_master. 483 484 @see: multi.processor.Application_callback. 485 """ 486 487 # Execute any setup code needed for the specific processor fabrics. 488 self.pre_run() 489 490 # Execution of the master processor. 491 if self.on_master(): 492 # Execute the program's run() method, as specified by the Application_callback. 493 try: 494 self.callback.init_master(self) 495 496 # Allow sys.exit() calls. 497 except SystemExit: 498 # Allow the processor fabric to clean up. 499 self.exit() 500 501 # Continue with the sys.exit(). 502 raise 503 504 # Handle all errors nicely. 505 except Exception: 506 e = sys.exc_info()[1] 507 self.callback.handle_exception(self, e) 508 509 # Execution of the slave processor. 510 else: 511 # Loop until the slave is asked to die via an Exit_command setting the do_quit flag. 512 while not self.do_quit: 513 # Execute the slave by catching commands, catching all exceptions. 514 try: 515 # Fetch any commands on the queue. 516 commands = self.slave_receive_commands() 517 518 # Convert to a list, if needed. 519 if not isinstance(commands, list): 520 commands = [commands] 521 522 # Initialise the results list. 523 if self.batched_returns: 524 self.result_list = [] 525 else: 526 self.result_list = None 527 528 # Capture the standard IO streams for the slaves. 529 self.stdio_capture() 530 531 # Execute each command, one by one. 532 for i, command in enumerate(commands): 533 # Set the completed flag if this is the last command. 534 completed = (i == len(commands)-1) 535 536 # Execute the calculation. 537, completed) 538 539 # Restore the IO. 540 self.stdio_restore() 541 542 # Process the batched results. 543 if self.batched_returns: 544 self.return_object(Batched_result_command(processor=self, result_commands=self.result_list, io_data=self.io_data)) 545 self.result_list = None 546 547 # Capture and process all slave exceptions. 548 except: 549 capturing_exception = Capturing_exception(rank=self.rank(), name=self.get_name()) 550 exception_result = Result_exception(exception=capturing_exception, processor=self, completed=True) 551 552 self.return_object(exception_result) 553 self.result_list = None 554 555 # Execute any tear down code needed for the specific processor fabrics. 556 self.post_run() 557 558 # End of execution, so perform any exiting actions needed by the specific processor fabrics. 559 if self.on_master(): 560 self.exit()
561 562
563 - def run_command_globally(self, command):
564 """Run the same command on all slave processors. 565 566 @see: multi.processor.processor.Slave_command. 567 568 @param command: A slave command. 569 @type command: Slave_command instance 570 """ 571 572 queue = [command for i in range(self.processor_size())] 573 self.run_command_queue(queue)
574 575
576 - def run_command_queue(self, queue):
577 """Process all commands on the queue and wait for completion. 578 579 @param queue: The command queue. 580 @type queue: list of Command instances 581 """ 582 583 # This must only be run on the master processor. 584 self.assert_on_master() 585 586 running_set = set() 587 idle_set = set([i for i in range(1, self.processor_size()+1)]) 588 589 if self.threaded_result_processing: 590 result_queue = Threaded_result_queue(self) 591 else: 592 result_queue = Immediate_result_queue(self) 593 594 while len(queue) != 0: 595 596 while len(idle_set) != 0: 597 if len(queue) != 0: 598 command = queue.pop() 599 dest = idle_set.pop() 600 self.master_queue_command(command=command, dest=dest) 601 running_set.add(dest) 602 else: 603 break 604 605 # Loop until the queue of calculations is depleted. 606 while len(running_set) != 0: 607 # Get the result. 608 result = self.master_receive_result() 609 610 # Debugging printout. 611 if verbosity.level(): 612 print('\nIdle set: %s' % idle_set) 613 print('Running set: %s' % running_set) 614 615 # Shift the processor rank to the idle set. 616 if result.completed: 617 idle_set.add(result.rank) 618 running_set.remove(result.rank) 619 620 # Add to the result queue for instant or threaded processing. 621 result_queue.put(result) 622 623 # Process the threaded results. 624 if self.threaded_result_processing: 625 result_queue.run_all()
626 627
628 - def run_queue(self):
629 """Run the processor queue - an abstract method. 630 631 All commands queued with add_to_queue will be executed, this function causes the current 632 thread to block until the command has completed. 633 """ 634 635 #FIXME: need a finally here to cleanup exceptions states 636 lqueue = self.chunk_queue(self.command_queue) 637 self.run_command_queue(lqueue) 638 639 del self.command_queue[:] 640 self.memo_map.clear()
641 642
643 - def send_data_to_slaves(self, name=None, value=None):
644 """Transfer the given data from the master to all slaves. 645 646 @keyword name: The name of the data structure to store. 647 @type name: str 648 @keyword value: The data structure. 649 @type value: anything 650 """ 651 652 # This must be the master processor! 653 self.assert_on_master() 654 655 # Create the command list. 656 for i in range(self.processor_size()): 657 # Create and append the command. 658 command = Slave_storage_command() 659 660 # Add the data to the command. 661 command.add(name, value) 662 663 # Add the command to the queue. 664 self.add_to_queue(command) 665 666 # Flush the queue. 667 self.run_queue()
668 669
670 - def stdio_capture(self):
671 """Enable capture of the STDOUT and STDERR. 672 673 This is currently used to capture the IO streams of the slaves to return back to the master. 674 """ 675 676 # Store the original STDOUT and STDERR for restoring later on. 677 self.orig_stdout = sys.stdout 678 self.orig_stderr = sys.stderr 679 680 # The data object. 681 self.io_data = [] 682 683 # Get the strings to prepend to the IO streams. 684 pre_strings = self.get_stdio_pre_strings() 685 686 # Then redirect IO. 687 sys.stdout = Redirect_text(self.io_data, token=pre_strings[0], stream=0) 688 sys.stderr = Redirect_text(self.io_data, token=pre_strings[1], stream=1)
689 690
691 - def stdio_restore(self):
692 """Restore the original STDOUT and STDERR streams.""" 693 694 # Restore the original streams. 695 sys.stdout = self.orig_stdout 696 sys.stderr = self.orig_stderr