Package multi :: Module processor
[hide private]
[frames] | no frames]

Source Code for Module multi.processor

  1  ############################################################################### 
  2  #                                                                             # 
  3  # Copyright (C) 2007 Gary S Thompson (https://gna.org/users/varioustoxins)    # 
  4  # Copyright (C) 2011-2012 Edward d'Auvergne                                   # 
  5  #                                                                             # 
  6  # This file is part of the program relax (http://www.nmr-relax.com).          # 
  7  #                                                                             # 
  8  # This program is free software: you can redistribute it and/or modify        # 
  9  # it under the terms of the GNU General Public License as published by        # 
 10  # the Free Software Foundation, either version 3 of the License, or           # 
 11  # (at your option) any later version.                                         # 
 12  #                                                                             # 
 13  # This program is distributed in the hope that it will be useful,             # 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of              # 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               # 
 16  # GNU General Public License for more details.                                # 
 17  #                                                                             # 
 18  # You should have received a copy of the GNU General Public License           # 
 19  # along with this program.  If not, see <http://www.gnu.org/licenses/>.       # 
 20  #                                                                             # 
 21  ############################################################################### 
 22   
 23  # Module docstring. 
 24  """The processor class is the central class in the multi python multiprocessor framework. 
 25   
 26  Overview 
 27  ======== 
 28   
 29  The framework has two main responsibilities: 
 30   
 31       1. Process management - if needed the processor can create the slave processes it manages if 
 32          they haven't been created by the operating system. It is also responsible for reporting 
 33          exceptions and shutting down the multiprocessor in the face of errors. 
 34       2. Scheduling commands on the slave processors via an interprocess communication fabric (MPI, 
 35          PVM, threads etc) and processing returned text and result commands. 
 36   
 37   
 38  Using the processor framework 
 39  ============================= 
 40   
 41  Users of the processor framework will typically use the following methodology: 
 42   
 43       1. At application startup determine the name of the required processor implementation and the number of slave processors requested. 
 44   
 45       2. Create an Application_callback object.  For example: 
 46              relax_instance = Relax() 
 47              callbacks = Application_callback(master=relax_instance) 
 48   
 49       3. Dynamically load a processor implementation using the name of the processor and the number of required slave processors.  For example: 
 50              processor = Processor.load_multiprocessor(relax_instance.multiprocessor_type, callbacks, processor_size=relax_instance.n_processors) 
 51   
 52       4. Call run on the processor instance returned above and handle all Exceptions.  For example: 
 53              processor.run() 
 54   
 55       5. After calling run, the processor will call back to Application_callback.init_master from which you should call you main program (Application_callback defaults to self.master.run()). 
 56   
 57       6. Once in the main program you should call processor.add_to_queue with a series of multi.Slave_command objects you wish to be run across the slave processor pool and then call processor.run_queue to actually execute the commands remotely while blocking. 
 58          >>> 
 59          example here... 
 60   
 61       7. Processor.Slave_commands will then run remotely on the slaves and any thrown exceptions and processor.result_commands queued to processor.return_object will be returned to the master processor and handled or executed. The slave processors also provide facilities for capturing the STDERR and STDOUT streams and returning their contents as strings for display on the master's STDOUT and STDERR streams (***more**?). 
 62   
 63   
 64  Extending the processor framework with a new interprocess communication fabric 
 65  ============================================================================== 
 66   
 67  The processor class acts as a base class that defines all the commands that a processor implementing 
 68  a new inter processor communication fabric needs. All that is required is to implement a subclass of 
 69  processor providing the required methods (of course as python provides dynamic typing and 
 70  polymorphism 'duck typing' you can always implement a class with the same set of method and it will 
 71  also work). Currently processor classes are loaded from the processor module and are modules with 
 72  names of the form: 
 73   
 74  >>> multi.<type>_processor.<Type>_processor 
 75   
 76  where <Type> is the name of the processor with the correct capitalisation e.g. 
 77   
 78  >>> processor_name = 'mpi4py' 
 79  >>> callback = My_application-callback() 
 80  >>> proccesor_size = 6 
 81  >>> processor.load_multiprocessor(processor_name, callback, processor_size) 
 82   
 83  will load multi.mpi4py_processor.Mpi4py_Processor. 
 84   
 85   
 86  TODO 
 87  ==== 
 88   
 89  The following are yet to be implemented: 
 90   
 91      1. There is no ability of the processor to request command line arguments. 
 92   
 93      2. The processor can't currently be loaded from somewhere other than the multi directory. 
 94   
 95  """ 
 96   
 97  #FIXME: better requirement of inherited commands. 
 98  #TODO: check exceptions on master. 
 99   
100  # Python module imports. 
101  import time, datetime, math, sys 
102   
103  # multi module imports. 
104  from multi.misc import Capturing_exception, raise_unimplemented, Verbosity; verbosity = Verbosity() 
105  from multi.result_queue import Threaded_result_queue 
106  from multi.processor_io import Redirect_text 
107  from multi.result_commands import Batched_result_command, Null_result_command, Result_exception 
108  from multi.slave_commands import Slave_storage_command 
109   
110   
111 -class Data_store:
112 """A special Processor specific data storage container."""
113 114
115 -class Processor(object):
116 """The central class of the multi processor framework. 117 118 This provides facilities for process management, command queueing, command scheduling, remote 119 execution of commands, and handling of results and error from commands. The class is abstract 120 and should be overridden to implement new interprocess communication methods, however, even then 121 users are encouraged to override the more full implemented multi.multi_processor.Multi_processor 122 class. Most users should instantiate instances of this class by calling the static method 123 Processor.load_multiprocessor. 124 125 The class is designed to be subclassed and has abstract methods that a subclass needs to 126 override. Methods which can be overridden are clearly marked with a note annotation stating that 127 they can be overridden. 128 129 @todo: It maybe a good idea to separate out the features of the class that purely deal with the 130 interprocess communication fabric. 131 @todo: The processor can't currently harvest the required command line arguments from the 132 current command line. 133 """ 134 135
136 - def __init__(self, processor_size, callback):
137 """Initialise the processor. 138 139 @param processor_size: The requested number of __slave__processors, if the number of 140 processors is set by the environment (e.g. in the case of MPI via 141 mpiexec -np <n-processors> on the command line the processor is free 142 free to ignore this value. The default value from the command line 143 is -1, and subclasses on receiving this value either raise and 144 exception or determine the correct number of slaves to create (e.g. 145 on a multi-cored machine using a threaded implementation the correct 146 number of slaves would be equal to the number of cores available). 147 @type processor_size: int 148 @param callback: The application callback which allows the host application to start 149 its main loop and handle exceptions from the processor. 150 @type callback: multi.processor.Application_callback instance 151 """ 152 153 self.callback = callback 154 """Callback to interface to the host application 155 156 @see: Application_callback.""" 157 158 self.grainyness = 1 159 """The number of sub jobs to queue for each processor if we have more jobs than processors.""" 160 161 # # CHECKME: am I implemented?, should I be an application callback function 162 # self.pre_queue_command = None 163 # """ command to call before the queue is run""" 164 # # CHECKME: am I implemented?, should I be an application callback function 165 # self.post_queue_command = None 166 # """ command to call after the queue has completed running""" 167 # 168 #CHECKME: should I be a singleton 169 self.NULL_RESULT = Null_result_command(processor=self) 170 """Empty result command used by commands which do not return a result (a singleton?).""" 171 172 # Initialise the processor specific data store. 173 self.data_store = Data_store() 174 """The processor data store.""" 175 176 self._processor_size = processor_size 177 """Number of slave processors available in this processor.""" 178 179 self.threaded_result_processing = True 180 """Flag for the handling of result processing via self.run_command_queue()."""
181 182
183 - def abort(self):
184 """Shutdown the multi processor in exceptional conditions - designed for overriding. 185 186 This method is called after an exception from the master or slave has been raised and processed and is responsible for the shutdown of the multi processor fabric and terminating the application. The functions should be called as the last thing that Application_callback.handle_exception does. 187 188 As an example of the methods use see Mpi4py_processor.abort which calls MPI.COMM_WORLD.Abort() to cleanly shutdown the mpi framework and remove dangling processes. 189 190 The default action is to call the special self.exit() method. 191 192 @see: multi.processor.Application_callback. 193 @see: multi.mpi4py_processor.Mpi4py_processor.abort(). 194 @see: mpi4py.MPI.COMM_WORLD.Abort(). 195 """ 196 197 self.exit()
198 199
200 - def add_to_queue(self, command, memo=None):
201 """Add a command for remote execution to the queue - an abstract method. 202 203 @see: multi.processor.Slave_command 204 @see: multi.processor.Result_command 205 @see: multi.processor.Memo 206 207 @param command: A command to execute on a slave processor. 208 @type command: ? subclass instance 209 @keyword memo: A place to place data needed on command completion (e.g. where to save the 210 results) the data stored in the memo is provided to Result_commands 211 generated by the command submitted. 212 @type memo: Memo subclass instance 213 """ 214 215 raise_unimplemented(self.add_to_queue)
216 217
218 - def assert_on_master(self):
219 """Make sure that this is the master processor and not a slave. 220 221 @raises Exception: If not on the master processor. 222 """ 223 224 raise_unimplemented(self.assert_on_master)
225 226
227 - def exit(self, status=0):
228 """Exit the processor with the given status. 229 230 This default method allows the program to drop off the end and terminate as it normally would - i.e. this method does nothing. 231 232 @keyword status: The program exit status. 233 @type status: int 234 """
235 236
237 - def fetch_data(self, name=None):
238 """Fetch the data structure of the given name from the data store. 239 240 This can be run on the master or slave processors. 241 242 243 @keyword name: The name of the data structure to fetch. 244 @type name: str 245 @return: The value of the associated data structure. 246 @rtype: anything 247 """ 248 249 # Get the object. 250 obj = getattr(self.data_store, name) 251 252 # Return the value. 253 return obj
254 255
256 - def get_intro_string(self):
257 """Get a string describing the multi processor - designed for overriding. 258 259 The string should be suitable for display at application startup and should be less than 100 260 characters wide. A good example is the string returned by mpi4py_processor: 261 262 >>> MPI running via mpi4py with <n> slave processors & 1 master, mpi version = <x>.<y> 263 264 @see: multi.processor.mpi4py_processor.Mpi4py_processor.get_intro_string. 265 266 @return: A string describing the multi processor. 267 @rtype: str 268 """ 269 270 raise_unimplemented(self.get_intro_string)
271 272
273 - def get_name(self):
274 """Get the name of the current processor - an abstract method. 275 276 The string should identify the current master or slave processor uniquely but is purely for 277 information and debugging. For example the mpi implementation uses the string 278 <host-name>-<process-id> whereas the thread implementation uses the id of the current thread 279 as provided by python. 280 281 @return: The processor identifier. 282 @rtype: str 283 """ 284 285 raise_unimplemented(self.get_name)
286 287
288 - def get_stdio_pre_strings(self):
289 """Get the strings used prepend STDOUT and STDERR dependant on the current rank. 290 291 For processors with only one slave the result should be ('', '') - designed for overriding. 292 293 @note: The defaults are ('M S|', 'M E|') and ('NN S|' , 'NN E|') for masters and slaves 294 respectively with NN replaced by the rank of the processor. 295 296 @return: A list of two strings for prepending to each line of STDOUT and STDERR. 297 @rtype: list of 2 str 298 """ 299 300 # Only prepend test if the verbosity level is set. 301 if not verbosity.level(): 302 return '', '' 303 304 # Initialise. 305 pre_string = '' 306 stdout_string = '' 307 stderr_string = '' 308 rank = self.rank() 309 310 # Start of the slave string. 311 if self.processor_size() > 1 and rank > 0: 312 pre_string = self.rank_format_string() % rank 313 314 # Start of the master string. 315 elif self.processor_size() > 1 and rank == 0: 316 pre_string = 'M'*self.rank_format_string_width() 317 318 # For multi-processors, the STDOUT and STDERR indicators, and the separator. 319 if self.processor_size() > 1: 320 stderr_string = pre_string + ' E| ' 321 stdout_string = pre_string + ' | ' 322 323 # Return the strings to prepend to the STDOUT and STDERR streams. 324 return stdout_string, stderr_string
325 326
327 - def get_time_delta(self, start_time, end_time):
328 """Utility function called to format the difference between application start and end times. 329 330 @todo: Check my format is correct. 331 332 @param start_time: The time the application started in seconds since the epoch. 333 @type start_time: float 334 @param end_time: The time the application ended in seconds since the epoch. 335 @type end_time: float 336 @return: The time difference in the format 'hours:minutes:seconds'. 337 @rtype: str 338 """ 339 340 time_diff = end_time - start_time 341 time_delta = datetime.timedelta(seconds=time_diff) 342 time_delta_str = time_delta.__str__() 343 (time_delta_str, millis) = time_delta_str.split('.', 1) 344 return time_delta_str
345 346
347 - def master_queue_command(self, command, dest):
348 """Slave to master processor data transfer - send the result command from the slave. 349 350 This is invoked by the slave processor. 351 352 353 @param command: The results command to send to the master. 354 @type command: Results_command instance 355 @param dest: The destination processor's rank. 356 @type dest: int 357 """ 358 359 raise_unimplemented(self.master_queue_command)
360 361
362 - def master_receive_result(self):
363 """Slave to master processor data transfer - receive the result command from the slave. 364 365 This is invoked by the master processor. 366 367 @return: The result command sent by the slave. 368 @rtype: Result_command instance 369 """ 370 371 raise_unimplemented(self.master_receive_result)
372 373
374 - def post_run(self):
375 """Method called after the application main loop has finished - designed for overriding. 376 377 The default implementation outputs the application runtime to STDOUT. All subclasses should 378 call the base method as their last action via super(). Only called on the master on normal 379 exit from the applications run loop. 380 """ 381 382 if self.rank() == 0: 383 end_time = time.time() 384 time_delta_str = self.get_time_delta(self.start_time, end_time) 385 386 # Print out of the total run time. 387 if verbosity.level(): 388 print('\nOverall runtime: ' + time_delta_str + '\n')
389 390
391 - def pre_run(self):
392 """Method called before starting the application main loop - designed for overriding. 393 394 The default implementation just saves the start time for application timing. All subclasses 395 should call the base method via super(). Only called on the master. 396 """ 397 398 if self.rank() == 0: 399 self.start_time = time.time()
400 401
402 - def processor_size(self):
403 """Get the number of slave processors - designed for overriding. 404 405 @return: The number of slave processors. 406 @rtype: int 407 """ 408 409 return self._processor_size
410 411
412 - def rank(self):
413 """Get the rank of this processor - an abstract method. 414 415 The rank of the processor should be a number between 0 and n where n is the number of slave 416 processors, the rank of 0 is reserved for the master processor. 417 418 @return: The rank of the processor. 419 @rtype: int 420 """ 421 422 raise_unimplemented(self.rank)
423 424
425 - def rank_format_string(self):
426 """Get a formatted string with the rank of a slave. 427 428 Only called on slaves. 429 430 @return: The string designating the rank of the slave. 431 @rtype: str 432 """ 433 434 digits = self.rank_format_string_width() 435 format = '%%%di' % digits 436 return format
437 438
439 - def rank_format_string_width(self):
440 """Get the width of the string designating the rank of a slave process. 441 442 Typically this will be the number of digits in the slaves rank. 443 444 @return: The number of digits in the biggest slave processor's rank. 445 @rtype: int 446 """ 447 448 return int(math.ceil(math.log10(self.processor_size())))
449 450
451 - def return_object(self, result):
452 """Return a result to the master processor from a slave - an abstract method. 453 454 @param result: A result to be returned to the master processor. 455 @type result: Result_string, Result_command or Exception instance 456 457 @see: multi.processor.Result_string. 458 @see: multi.processor.Resulf_command. 459 """ 460 461 raise_unimplemented(self.return_object)
462 463
464 - def run(self):
465 """Run the processor - an abstract method. 466 467 This function runs the processor main loop and is called after all processor setup has been completed. It does remote execution setup and teardown (via self.pre_run() and self.post_run()) round either side of a call to Application_callback.init_master. 468 469 @see: multi.processor.Application_callback. 470 """ 471 472 # Execute any setup code needed for the specific processor fabrics. 473 self.pre_run() 474 475 # Execution of the master processor. 476 if self.on_master(): 477 # Execute the program's run() method, as specified by the Application_callback. 478 try: 479 self.callback.init_master(self) 480 481 # Allow sys.exit() calls. 482 except SystemExit: 483 # Allow the processor fabric to clean up. 484 self.exit() 485 486 # Continue with the sys.exit(). 487 raise 488 489 # Handle all errors nicely. 490 except Exception: 491 e = sys.exc_info()[1] 492 self.callback.handle_exception(self, e) 493 494 # Execution of the slave processor. 495 else: 496 # Loop until the slave is asked to die via an Exit_command setting the do_quit flag. 497 while not self.do_quit: 498 # Execute the slave by catching commands, catching all exceptions. 499 try: 500 # Fetch any commands on the queue. 501 commands = self.slave_receive_commands() 502 503 # Convert to a list, if needed. 504 if not isinstance(commands, list): 505 commands = [commands] 506 507 # Initialise the results list. 508 if self.batched_returns: 509 self.result_list = [] 510 else: 511 self.result_list = None 512 513 # Execute each command, one by one. 514 for i, command in enumerate(commands): 515 # Capture the standard IO streams for the slaves. 516 self.stdio_capture() 517 518 # Set the completed flag if this is the last command. 519 completed = (i == len(commands)-1) 520 521 # Execute the calculation. 522 command.run(self, completed) 523 524 # Restore the IO. 525 self.stdio_restore() 526 527 # Process the batched results. 528 if self.batched_returns: 529 self.return_object(Batched_result_command(processor=self, result_commands=self.result_list, io_data=self.io_data)) 530 self.result_list = None 531 532 # Capture and process all slave exceptions. 533 except: 534 capturing_exception = Capturing_exception(rank=self.rank(), name=self.get_name()) 535 exception_result = Result_exception(exception=capturing_exception, processor=self, completed=True) 536 537 self.return_object(exception_result) 538 self.result_list = None 539 540 # Execute any tear down code needed for the specific processor fabrics. 541 self.post_run() 542 543 # End of execution, so perform any exiting actions needed by the specific processor fabrics. 544 if self.on_master(): 545 self.exit()
546 547
548 - def run_command_globally(self, command):
549 """Run the same command on all slave processors. 550 551 @see: multi.processor.processor.Slave_command. 552 553 @param command: A slave command. 554 @type command: Slave_command instance 555 """ 556 557 queue = [command for i in range(self.processor_size())] 558 self.run_command_queue(queue)
559 560
561 - def run_command_queue(self, queue):
562 """Process all commands on the queue and wait for completion. 563 564 @param queue: The command queue. 565 @type queue: list of Command instances 566 """ 567 568 # This must only be run on the master processor. 569 self.assert_on_master() 570 571 running_set = set() 572 idle_set = set([i for i in range(1, self.processor_size()+1)]) 573 574 if self.threaded_result_processing: 575 result_queue = Threaded_result_queue(self) 576 else: 577 result_queue = Immediate_result_queue(self) 578 579 while len(queue) != 0: 580 581 while len(idle_set) != 0: 582 if len(queue) != 0: 583 command = queue.pop() 584 dest = idle_set.pop() 585 self.master_queue_command(command=command, dest=dest) 586 running_set.add(dest) 587 else: 588 break 589 590 # Loop until the queue of calculations is depleted. 591 while len(running_set) != 0: 592 # Get the result. 593 result = self.master_receive_result() 594 595 # Debugging printout. 596 if verbosity.level(): 597 print('\nIdle set: %s' % idle_set) 598 print('Running set: %s' % running_set) 599 600 # Shift the processor rank to the idle set. 601 if result.completed: 602 idle_set.add(result.rank) 603 running_set.remove(result.rank) 604 605 # Add to the result queue for instant or threaded processing. 606 result_queue.put(result) 607 608 # Process the threaded results. 609 if self.threaded_result_processing: 610 result_queue.run_all()
611 612
613 - def run_queue(self):
614 """Run the processor queue - an abstract method. 615 616 All commands queued with add_to_queue will be executed, this function causes the current 617 thread to block until the command has completed. 618 """ 619 620 #FIXME: need a finally here to cleanup exceptions states 621 lqueue = self.chunk_queue(self.command_queue) 622 self.run_command_queue(lqueue) 623 624 del self.command_queue[:] 625 self.memo_map.clear()
626 627
628 - def send_data_to_slaves(self, name=None, value=None):
629 """Transfer the given data from the master to all slaves. 630 631 @keyword name: The name of the data structure to store. 632 @type name: str 633 @keyword value: The data structure. 634 @type value: anything 635 """ 636 637 # This must be the master processor! 638 self.assert_on_master() 639 640 # Create the command list. 641 for i in range(self.processor_size()): 642 # Create and append the command. 643 command = Slave_storage_command() 644 645 # Add the data to the command. 646 command.add(name, value) 647 648 # Add the command to the queue. 649 self.add_to_queue(command) 650 651 # Flush the queue. 652 self.run_queue()
653 654
655 - def stdio_capture(self):
656 """Enable capture of the STDOUT and STDERR. 657 658 This is currently used to capture the IO streams of the slaves to return back to the master. 659 """ 660 661 # Store the original STDOUT and STDERR for restoring later on. 662 self.orig_stdout = sys.stdout 663 self.orig_stderr = sys.stderr 664 665 # The data object. 666 self.io_data = [] 667 668 # Get the strings to prepend to the IO streams. 669 pre_strings = self.get_stdio_pre_strings() 670 671 # Then redirect IO. 672 sys.stdout = Redirect_text(self.io_data, token=pre_strings[0], stream=0) 673 sys.stderr = Redirect_text(self.io_data, token=pre_strings[1], stream=1)
674 675
676 - def stdio_restore(self):
677 """Restore the original STDOUT and STDERR streams.""" 678 679 # Restore the original streams. 680 sys.stdout = self.orig_stdout 681 sys.stderr = self.orig_stderr
682