1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 """The processor class is the central class in the multi python multiprocessor framework.
25
26 Overview
27 ========
28
29 The framework has two main responsibilities:
30
31 1. Process management - if needed the processor can create the slave processes it manages if
32 they haven't been created by the operating system. It is also responsible for reporting
33 exceptions and shutting down the multiprocessor in the face of errors.
34 2. Scheduling commands on the slave processors via an interprocess communication fabric (MPI,
35 PVM, threads etc) and processing returned text and result commands.
36
37
38 Using the processor framework
39 =============================
40
41 Users of the processor framework will typically use the following methodology:
42
43 1. At application startup determine the name of the required processor implementation and the number of slave processors requested.
44
45 2. Create an Application_callback object. For example:
46 relax_instance = Relax()
47 callbacks = Application_callback(master=relax_instance)
48
49 3. Dynamically load a processor implementation using the name of the processor and the number of required slave processors. For example:
50 processor = Processor.load_multiprocessor(relax_instance.multiprocessor_type, callbacks, processor_size=relax_instance.n_processors)
51
52 4. Call run on the processor instance returned above and handle all Exceptions. For example:
53 processor.run()
54
55 5. After calling run, the processor will call back to Application_callback.init_master from which you should call you main program (Application_callback defaults to self.master.run()).
56
57 6. Once in the main program you should call processor.add_to_queue with a series of multi.Slave_command objects you wish to be run across the slave processor pool and then call processor.run_queue to actually execute the commands remotely while blocking.
58 >>>
59 example here...
60
61 7. Processor.Slave_commands will then run remotely on the slaves and any thrown exceptions and processor.result_commands queued to processor.return_object will be returned to the master processor and handled or executed. The slave processors also provide facilities for capturing the STDERR and STDOUT streams and returning their contents as strings for display on the master's STDOUT and STDERR streams (***more**?).
62
63
64 Extending the processor framework with a new interprocess communication fabric
65 ==============================================================================
66
67 The processor class acts as a base class that defines all the commands that a processor implementing
68 a new inter processor communication fabric needs. All that is required is to implement a subclass of
69 processor providing the required methods (of course as python provides dynamic typing and
70 polymorphism 'duck typing' you can always implement a class with the same set of method and it will
71 also work). Currently processor classes are loaded from the processor module and are modules with
72 names of the form:
73
74 >>> multi.<type>_processor.<Type>_processor
75
76 where <Type> is the name of the processor with the correct capitalisation e.g.
77
78 >>> processor_name = 'mpi4py'
79 >>> callback = My_application-callback()
80 >>> proccesor_size = 6
81 >>> processor.load_multiprocessor(processor_name, callback, processor_size)
82
83 will load multi.mpi4py_processor.Mpi4py_Processor.
84
85
86 TODO
87 ====
88
89 The following are yet to be implemented:
90
91 1. There is no ability of the processor to request command line arguments.
92
93 2. The processor can't currently be loaded from somewhere other than the multi directory.
94
95 """
96
97
98
99
100
101 import time, datetime, math, sys
102
103
104 from multi.misc import Capturing_exception, raise_unimplemented, Verbosity; verbosity = Verbosity()
105 from multi.result_queue import Threaded_result_queue
106 from multi.processor_io import Redirect_text
107 from multi.result_commands import Batched_result_command, Null_result_command, Result_exception
108 from multi.slave_commands import Slave_storage_command
109
110
112 """A special Processor specific data storage container."""
113
114
116 """The central class of the multi processor framework.
117
118 This provides facilities for process management, command queueing, command scheduling, remote
119 execution of commands, and handling of results and error from commands. The class is abstract
120 and should be overridden to implement new interprocess communication methods, however, even then
121 users are encouraged to override the more full implemented multi.multi_processor.Multi_processor
122 class. Most users should instantiate instances of this class by calling the static method
123 Processor.load_multiprocessor.
124
125 The class is designed to be subclassed and has abstract methods that a subclass needs to
126 override. Methods which can be overridden are clearly marked with a note annotation stating that
127 they can be overridden.
128
129 @todo: It maybe a good idea to separate out the features of the class that purely deal with the
130 interprocess communication fabric.
131 @todo: The processor can't currently harvest the required command line arguments from the
132 current command line.
133 """
134
135
136 - def __init__(self, processor_size, callback):
137 """Initialise the processor.
138
139 @param processor_size: The requested number of __slave__processors, if the number of
140 processors is set by the environment (e.g. in the case of MPI via
141 mpiexec -np <n-processors> on the command line the processor is free
142 free to ignore this value. The default value from the command line
143 is -1, and subclasses on receiving this value either raise and
144 exception or determine the correct number of slaves to create (e.g.
145 on a multi-cored machine using a threaded implementation the correct
146 number of slaves would be equal to the number of cores available).
147 @type processor_size: int
148 @param callback: The application callback which allows the host application to start
149 its main loop and handle exceptions from the processor.
150 @type callback: multi.processor.Application_callback instance
151 """
152
153 self.callback = callback
154 """Callback to interface to the host application
155
156 @see: Application_callback."""
157
158 self.grainyness = 1
159 """The number of sub jobs to queue for each processor if we have more jobs than processors."""
160
161
162
163
164
165
166
167
168
169 self.NULL_RESULT = Null_result_command(processor=self)
170 """Empty result command used by commands which do not return a result (a singleton?)."""
171
172
173 self.data_store = Data_store()
174 """The processor data store."""
175
176 self._processor_size = processor_size
177 """Number of slave processors available in this processor."""
178
179 self.threaded_result_processing = True
180 """Flag for the handling of result processing via self.run_command_queue()."""
181
182
184 """Shutdown the multi processor in exceptional conditions - designed for overriding.
185
186 This method is called after an exception from the master or slave has been raised and processed and is responsible for the shutdown of the multi processor fabric and terminating the application. The functions should be called as the last thing that Application_callback.handle_exception does.
187
188 As an example of the methods use see Mpi4py_processor.abort which calls MPI.COMM_WORLD.Abort() to cleanly shutdown the mpi framework and remove dangling processes.
189
190 The default action is to call the special self.exit() method.
191
192 @see: multi.processor.Application_callback.
193 @see: multi.mpi4py_processor.Mpi4py_processor.abort().
194 @see: mpi4py.MPI.COMM_WORLD.Abort().
195 """
196
197 self.exit()
198
199
201 """Add a command for remote execution to the queue - an abstract method.
202
203 @see: multi.processor.Slave_command
204 @see: multi.processor.Result_command
205 @see: multi.processor.Memo
206
207 @param command: A command to execute on a slave processor.
208 @type command: ? subclass instance
209 @keyword memo: A place to place data needed on command completion (e.g. where to save the
210 results) the data stored in the memo is provided to Result_commands
211 generated by the command submitted.
212 @type memo: Memo subclass instance
213 """
214
215 raise_unimplemented(self.add_to_queue)
216
217
219 """Make sure that this is the master processor and not a slave.
220
221 @raises Exception: If not on the master processor.
222 """
223
224 raise_unimplemented(self.assert_on_master)
225
226
227 - def exit(self, status=0):
228 """Exit the processor with the given status.
229
230 This default method allows the program to drop off the end and terminate as it normally would - i.e. this method does nothing.
231
232 @keyword status: The program exit status.
233 @type status: int
234 """
235
236
238 """Fetch the data structure of the given name from the data store.
239
240 This can be run on the master or slave processors.
241
242
243 @keyword name: The name of the data structure to fetch.
244 @type name: str
245 @return: The value of the associated data structure.
246 @rtype: anything
247 """
248
249
250 obj = getattr(self.data_store, name)
251
252
253 return obj
254
255
257 """Get a string describing the multi processor - designed for overriding.
258
259 The string should be suitable for display at application startup and should be less than 100
260 characters wide. A good example is the string returned by mpi4py_processor:
261
262 >>> MPI running via mpi4py with <n> slave processors & 1 master, mpi version = <x>.<y>
263
264 @see: multi.processor.mpi4py_processor.Mpi4py_processor.get_intro_string.
265
266 @return: A string describing the multi processor.
267 @rtype: str
268 """
269
270 raise_unimplemented(self.get_intro_string)
271
272
274 """Get the name of the current processor - an abstract method.
275
276 The string should identify the current master or slave processor uniquely but is purely for
277 information and debugging. For example the mpi implementation uses the string
278 <host-name>-<process-id> whereas the thread implementation uses the id of the current thread
279 as provided by python.
280
281 @return: The processor identifier.
282 @rtype: str
283 """
284
285 raise_unimplemented(self.get_name)
286
287
289 """Get the strings used prepend STDOUT and STDERR dependant on the current rank.
290
291 For processors with only one slave the result should be ('', '') - designed for overriding.
292
293 @note: The defaults are ('M S|', 'M E|') and ('NN S|' , 'NN E|') for masters and slaves
294 respectively with NN replaced by the rank of the processor.
295
296 @return: A list of two strings for prepending to each line of STDOUT and STDERR.
297 @rtype: list of 2 str
298 """
299
300
301 if not verbosity.level():
302 return '', ''
303
304
305 pre_string = ''
306 stdout_string = ''
307 stderr_string = ''
308 rank = self.rank()
309
310
311 if self.processor_size() > 1 and rank > 0:
312 pre_string = self.rank_format_string() % rank
313
314
315 elif self.processor_size() > 1 and rank == 0:
316 pre_string = 'M'*self.rank_format_string_width()
317
318
319 if self.processor_size() > 1:
320 stderr_string = pre_string + ' E| '
321 stdout_string = pre_string + ' | '
322
323
324 return stdout_string, stderr_string
325
326
328 """Utility function called to format the difference between application start and end times.
329
330 @todo: Check my format is correct.
331
332 @param start_time: The time the application started in seconds since the epoch.
333 @type start_time: float
334 @param end_time: The time the application ended in seconds since the epoch.
335 @type end_time: float
336 @return: The time difference in the format 'hours:minutes:seconds'.
337 @rtype: str
338 """
339
340 time_diff = end_time - start_time
341 time_delta = datetime.timedelta(seconds=time_diff)
342 time_delta_str = time_delta.__str__()
343 (time_delta_str, millis) = time_delta_str.split('.', 1)
344 return time_delta_str
345
346
348 """Determine if any slave commands are queued.
349
350 @return: True if slave commands are in the queue, False otherwise.
351 @rtype: bool
352 """
353
354
355 if len(self.command_queue):
356 return True
357
358
359 return False
360
361
363 """Slave to master processor data transfer - send the result command from the slave.
364
365 This is invoked by the slave processor.
366
367
368 @param command: The results command to send to the master.
369 @type command: Results_command instance
370 @param dest: The destination processor's rank.
371 @type dest: int
372 """
373
374 raise_unimplemented(self.master_queue_command)
375
376
378 """Slave to master processor data transfer - receive the result command from the slave.
379
380 This is invoked by the master processor.
381
382 @return: The result command sent by the slave.
383 @rtype: Result_command instance
384 """
385
386 raise_unimplemented(self.master_receive_result)
387
388
389 - def post_run(self):
390 """Method called after the application main loop has finished - designed for overriding.
391
392 The default implementation outputs the application runtime to STDOUT. All subclasses should
393 call the base method as their last action via super(). Only called on the master on normal
394 exit from the applications run loop.
395 """
396
397 if self.rank() == 0:
398 end_time = time.time()
399 time_delta_str = self.get_time_delta(self.start_time, end_time)
400
401
402 if verbosity.level():
403 print('\nOverall runtime: ' + time_delta_str + '\n')
404
405
407 """Method called before starting the application main loop - designed for overriding.
408
409 The default implementation just saves the start time for application timing. All subclasses
410 should call the base method via super(). Only called on the master.
411 """
412
413 if self.rank() == 0:
414 self.start_time = time.time()
415
416
418 """Get the number of slave processors - designed for overriding.
419
420 @return: The number of slave processors.
421 @rtype: int
422 """
423
424 return self._processor_size
425
426
428 """Get the rank of this processor - an abstract method.
429
430 The rank of the processor should be a number between 0 and n where n is the number of slave
431 processors, the rank of 0 is reserved for the master processor.
432
433 @return: The rank of the processor.
434 @rtype: int
435 """
436
437 raise_unimplemented(self.rank)
438
439
452
453
464
465
467 """Return a result to the master processor from a slave - an abstract method.
468
469 @param result: A result to be returned to the master processor.
470 @type result: Result_string, Result_command or Exception instance
471
472 @see: multi.processor.Result_string.
473 @see: multi.processor.Resulf_command.
474 """
475
476 raise_unimplemented(self.return_object)
477
478
480 """Run the processor - an abstract method.
481
482 This function runs the processor main loop and is called after all processor setup has been completed. It does remote execution setup and teardown (via self.pre_run() and self.post_run()) round either side of a call to Application_callback.init_master.
483
484 @see: multi.processor.Application_callback.
485 """
486
487
488 self.pre_run()
489
490
491 if self.on_master():
492
493 try:
494 self.callback.init_master(self)
495
496
497 except SystemExit:
498
499 self.exit()
500
501
502 raise
503
504
505 except Exception:
506 e = sys.exc_info()[1]
507 self.callback.handle_exception(self, e)
508
509
510 else:
511
512 while not self.do_quit:
513
514 try:
515
516 commands = self.slave_receive_commands()
517
518
519 if not isinstance(commands, list):
520 commands = [commands]
521
522
523 if self.batched_returns:
524 self.result_list = []
525 else:
526 self.result_list = None
527
528
529 self.stdio_capture()
530
531
532 for i, command in enumerate(commands):
533
534 completed = (i == len(commands)-1)
535
536
537 command.run(self, completed)
538
539
540 self.stdio_restore()
541
542
543 if self.batched_returns:
544 self.return_object(Batched_result_command(processor=self, result_commands=self.result_list, io_data=self.io_data))
545 self.result_list = None
546
547
548 except:
549 capturing_exception = Capturing_exception(rank=self.rank(), name=self.get_name())
550 exception_result = Result_exception(exception=capturing_exception, processor=self, completed=True)
551
552 self.return_object(exception_result)
553 self.result_list = None
554
555
556 self.post_run()
557
558
559 if self.on_master():
560 self.exit()
561
562
564 """Run the same command on all slave processors.
565
566 @see: multi.processor.processor.Slave_command.
567
568 @param command: A slave command.
569 @type command: Slave_command instance
570 """
571
572 queue = [command for i in range(self.processor_size())]
573 self.run_command_queue(queue)
574
575
626
627
629 """Run the processor queue - an abstract method.
630
631 All commands queued with add_to_queue will be executed, this function causes the current
632 thread to block until the command has completed.
633 """
634
635
636 lqueue = self.chunk_queue(self.command_queue)
637 self.run_command_queue(lqueue)
638
639 del self.command_queue[:]
640 self.memo_map.clear()
641
642
668
669
671 """Enable capture of the STDOUT and STDERR.
672
673 This is currently used to capture the IO streams of the slaves to return back to the master.
674 """
675
676
677 self.orig_stdout = sys.stdout
678 self.orig_stderr = sys.stderr
679
680
681 self.io_data = []
682
683
684 pre_strings = self.get_stdio_pre_strings()
685
686
687 sys.stdout = Redirect_text(self.io_data, token=pre_strings[0], stream=0)
688 sys.stderr = Redirect_text(self.io_data, token=pre_strings[1], stream=1)
689
690
692 """Restore the original STDOUT and STDERR streams."""
693
694
695 sys.stdout = self.orig_stdout
696 sys.stderr = self.orig_stderr
697