1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 """The processor class is the central class in the multi python multiprocessor framework.
25
26 Overview
27 ========
28
29 The framework has two main responsibilities:
30
31 1. Process management - if needed the processor can create the slave processes it manages if
32 they haven't been created by the operating system. It is also responsible for reporting
33 exceptions and shutting down the multiprocessor in the face of errors.
34 2. Scheduling commands on the slave processors via an interprocess communication fabric (MPI,
35 PVM, threads etc) and processing returned text and result commands.
36
37
38 Using the processor framework
39 =============================
40
41 Users of the processor framework will typically use the following methodology:
42
43 1. At application startup determine the name of the required processor implementation and the number of slave processors requested.
44
45 2. Create an Application_callback object. For example:
46 relax_instance = Relax()
47 callbacks = Application_callback(master=relax_instance)
48
49 3. Dynamically load a processor implementation using the name of the processor and the number of required slave processors. For example:
50 processor = Processor.load_multiprocessor(relax_instance.multiprocessor_type, callbacks, processor_size=relax_instance.n_processors)
51
52 4. Call run on the processor instance returned above and handle all Exceptions. For example:
53 processor.run()
54
55 5. After calling run, the processor will call back to Application_callback.init_master from which you should call you main program (Application_callback defaults to self.master.run()).
56
57 6. Once in the main program you should call processor.add_to_queue with a series of multi.Slave_command objects you wish to be run across the slave processor pool and then call processor.run_queue to actually execute the commands remotely while blocking.
58 >>>
59 example here...
60
61 7. Processor.Slave_commands will then run remotely on the slaves and any thrown exceptions and processor.result_commands queued to processor.return_object will be returned to the master processor and handled or executed. The slave processors also provide facilities for capturing the STDERR and STDOUT streams and returning their contents as strings for display on the master's STDOUT and STDERR streams (***more**?).
62
63
64 Extending the processor framework with a new interprocess communication fabric
65 ==============================================================================
66
67 The processor class acts as a base class that defines all the commands that a processor implementing
68 a new inter processor communication fabric needs. All that is required is to implement a subclass of
69 processor providing the required methods (of course as python provides dynamic typing and
70 polymorphism 'duck typing' you can always implement a class with the same set of method and it will
71 also work). Currently processor classes are loaded from the processor module and are modules with
72 names of the form:
73
74 >>> multi.<type>_processor.<Type>_processor
75
76 where <Type> is the name of the processor with the correct capitalisation e.g.
77
78 >>> processor_name = 'mpi4py'
79 >>> callback = My_application-callback()
80 >>> proccesor_size = 6
81 >>> processor.load_multiprocessor(processor_name, callback, processor_size)
82
83 will load multi.mpi4py_processor.Mpi4py_Processor.
84
85
86 TODO
87 ====
88
89 The following are yet to be implemented:
90
91 1. There is no ability of the processor to request command line arguments.
92
93 2. The processor can't currently be loaded from somewhere other than the multi directory.
94
95 """
96
97
98
99
100
101 import time, datetime, math, sys
102
103
104 from multi.misc import Capturing_exception, raise_unimplemented, Verbosity; verbosity = Verbosity()
105 from multi.result_queue import Threaded_result_queue
106 from multi.processor_io import Redirect_text
107 from multi.result_commands import Batched_result_command, Null_result_command, Result_exception
108 from multi.slave_commands import Slave_storage_command
109
110
112 """A special Processor specific data storage container."""
113
114
116 """The central class of the multi processor framework.
117
118 This provides facilities for process management, command queueing, command scheduling, remote
119 execution of commands, and handling of results and error from commands. The class is abstract
120 and should be overridden to implement new interprocess communication methods, however, even then
121 users are encouraged to override the more full implemented multi.multi_processor.Multi_processor
122 class. Most users should instantiate instances of this class by calling the static method
123 Processor.load_multiprocessor.
124
125 The class is designed to be subclassed and has abstract methods that a subclass needs to
126 override. Methods which can be overridden are clearly marked with a note annotation stating that
127 they can be overridden.
128
129 @todo: It maybe a good idea to separate out the features of the class that purely deal with the
130 interprocess communication fabric.
131 @todo: The processor can't currently harvest the required command line arguments from the
132 current command line.
133 """
134
135
136 - def __init__(self, processor_size, callback):
137 """Initialise the processor.
138
139 @param processor_size: The requested number of __slave__processors, if the number of
140 processors is set by the environment (e.g. in the case of MPI via
141 mpiexec -np <n-processors> on the command line the processor is free
142 free to ignore this value. The default value from the command line
143 is -1, and subclasses on receiving this value either raise and
144 exception or determine the correct number of slaves to create (e.g.
145 on a multi-cored machine using a threaded implementation the correct
146 number of slaves would be equal to the number of cores available).
147 @type processor_size: int
148 @param callback: The application callback which allows the host application to start
149 its main loop and handle exceptions from the processor.
150 @type callback: multi.processor.Application_callback instance
151 """
152
153 self.callback = callback
154 """Callback to interface to the host application
155
156 @see: Application_callback."""
157
158 self.grainyness = 1
159 """The number of sub jobs to queue for each processor if we have more jobs than processors."""
160
161
162
163
164
165
166
167
168
169 self.NULL_RESULT = Null_result_command(processor=self)
170 """Empty result command used by commands which do not return a result (a singleton?)."""
171
172
173 self.data_store = Data_store()
174 """The processor data store."""
175
176 self._processor_size = processor_size
177 """Number of slave processors available in this processor."""
178
179 self.threaded_result_processing = True
180 """Flag for the handling of result processing via self.run_command_queue()."""
181
182
184 """Shutdown the multi processor in exceptional conditions - designed for overriding.
185
186 This method is called after an exception from the master or slave has been raised and processed and is responsible for the shutdown of the multi processor fabric and terminating the application. The functions should be called as the last thing that Application_callback.handle_exception does.
187
188 As an example of the methods use see Mpi4py_processor.abort which calls MPI.COMM_WORLD.Abort() to cleanly shutdown the mpi framework and remove dangling processes.
189
190 The default action is to call the special self.exit() method.
191
192 @see: multi.processor.Application_callback.
193 @see: multi.mpi4py_processor.Mpi4py_processor.abort().
194 @see: mpi4py.MPI.COMM_WORLD.Abort().
195 """
196
197 self.exit()
198
199
201 """Add a command for remote execution to the queue - an abstract method.
202
203 @see: multi.processor.Slave_command
204 @see: multi.processor.Result_command
205 @see: multi.processor.Memo
206
207 @param command: A command to execute on a slave processor.
208 @type command: ? subclass instance
209 @keyword memo: A place to place data needed on command completion (e.g. where to save the
210 results) the data stored in the memo is provided to Result_commands
211 generated by the command submitted.
212 @type memo: Memo subclass instance
213 """
214
215 raise_unimplemented(self.add_to_queue)
216
217
219 """Make sure that this is the master processor and not a slave.
220
221 @raises Exception: If not on the master processor.
222 """
223
224 raise_unimplemented(self.assert_on_master)
225
226
227 - def exit(self, status=0):
228 """Exit the processor with the given status.
229
230 This default method allows the program to drop off the end and terminate as it normally would - i.e. this method does nothing.
231
232 @keyword status: The program exit status.
233 @type status: int
234 """
235
236
238 """Fetch the data structure of the given name from the data store.
239
240 This can be run on the master or slave processors.
241
242
243 @keyword name: The name of the data structure to fetch.
244 @type name: str
245 @return: The value of the associated data structure.
246 @rtype: anything
247 """
248
249
250 obj = getattr(self.data_store, name)
251
252
253 return obj
254
255
257 """Get a string describing the multi processor - designed for overriding.
258
259 The string should be suitable for display at application startup and should be less than 100
260 characters wide. A good example is the string returned by mpi4py_processor:
261
262 >>> MPI running via mpi4py with <n> slave processors & 1 master, mpi version = <x>.<y>
263
264 @see: multi.processor.mpi4py_processor.Mpi4py_processor.get_intro_string.
265
266 @return: A string describing the multi processor.
267 @rtype: str
268 """
269
270 raise_unimplemented(self.get_intro_string)
271
272
274 """Get the name of the current processor - an abstract method.
275
276 The string should identify the current master or slave processor uniquely but is purely for
277 information and debugging. For example the mpi implementation uses the string
278 <host-name>-<process-id> whereas the thread implementation uses the id of the current thread
279 as provided by python.
280
281 @return: The processor identifier.
282 @rtype: str
283 """
284
285 raise_unimplemented(self.get_name)
286
287
289 """Get the strings used prepend STDOUT and STDERR dependant on the current rank.
290
291 For processors with only one slave the result should be ('', '') - designed for overriding.
292
293 @note: The defaults are ('M S|', 'M E|') and ('NN S|' , 'NN E|') for masters and slaves
294 respectively with NN replaced by the rank of the processor.
295
296 @return: A list of two strings for prepending to each line of STDOUT and STDERR.
297 @rtype: list of 2 str
298 """
299
300
301 if not verbosity.level():
302 return '', ''
303
304
305 pre_string = ''
306 stdout_string = ''
307 stderr_string = ''
308 rank = self.rank()
309
310
311 if self.processor_size() > 1 and rank > 0:
312 pre_string = self.rank_format_string() % rank
313
314
315 elif self.processor_size() > 1 and rank == 0:
316 pre_string = 'M'*self.rank_format_string_width()
317
318
319 if self.processor_size() > 1:
320 stderr_string = pre_string + ' E| '
321 stdout_string = pre_string + ' | '
322
323
324 return stdout_string, stderr_string
325
326
328 """Utility function called to format the difference between application start and end times.
329
330 @todo: Check my format is correct.
331
332 @param start_time: The time the application started in seconds since the epoch.
333 @type start_time: float
334 @param end_time: The time the application ended in seconds since the epoch.
335 @type end_time: float
336 @return: The time difference in the format 'hours:minutes:seconds'.
337 @rtype: str
338 """
339
340 time_diff = end_time - start_time
341 time_delta = datetime.timedelta(seconds=time_diff)
342 time_delta_str = time_delta.__str__()
343 (time_delta_str, millis) = time_delta_str.split('.', 1)
344 return time_delta_str
345
346
348 """Slave to master processor data transfer - send the result command from the slave.
349
350 This is invoked by the slave processor.
351
352
353 @param command: The results command to send to the master.
354 @type command: Results_command instance
355 @param dest: The destination processor's rank.
356 @type dest: int
357 """
358
359 raise_unimplemented(self.master_queue_command)
360
361
363 """Slave to master processor data transfer - receive the result command from the slave.
364
365 This is invoked by the master processor.
366
367 @return: The result command sent by the slave.
368 @rtype: Result_command instance
369 """
370
371 raise_unimplemented(self.master_receive_result)
372
373
374 - def post_run(self):
375 """Method called after the application main loop has finished - designed for overriding.
376
377 The default implementation outputs the application runtime to STDOUT. All subclasses should
378 call the base method as their last action via super(). Only called on the master on normal
379 exit from the applications run loop.
380 """
381
382 if self.rank() == 0:
383 end_time = time.time()
384 time_delta_str = self.get_time_delta(self.start_time, end_time)
385
386
387 if verbosity.level():
388 print('\nOverall runtime: ' + time_delta_str + '\n')
389
390
392 """Method called before starting the application main loop - designed for overriding.
393
394 The default implementation just saves the start time for application timing. All subclasses
395 should call the base method via super(). Only called on the master.
396 """
397
398 if self.rank() == 0:
399 self.start_time = time.time()
400
401
403 """Get the number of slave processors - designed for overriding.
404
405 @return: The number of slave processors.
406 @rtype: int
407 """
408
409 return self._processor_size
410
411
413 """Get the rank of this processor - an abstract method.
414
415 The rank of the processor should be a number between 0 and n where n is the number of slave
416 processors, the rank of 0 is reserved for the master processor.
417
418 @return: The rank of the processor.
419 @rtype: int
420 """
421
422 raise_unimplemented(self.rank)
423
424
437
438
449
450
452 """Return a result to the master processor from a slave - an abstract method.
453
454 @param result: A result to be returned to the master processor.
455 @type result: Result_string, Result_command or Exception instance
456
457 @see: multi.processor.Result_string.
458 @see: multi.processor.Resulf_command.
459 """
460
461 raise_unimplemented(self.return_object)
462
463
465 """Run the processor - an abstract method.
466
467 This function runs the processor main loop and is called after all processor setup has been completed. It does remote execution setup and teardown (via self.pre_run() and self.post_run()) round either side of a call to Application_callback.init_master.
468
469 @see: multi.processor.Application_callback.
470 """
471
472
473 self.pre_run()
474
475
476 if self.on_master():
477
478 try:
479 self.callback.init_master(self)
480
481
482 except SystemExit:
483
484 self.exit()
485
486
487 raise
488
489
490 except Exception:
491 e = sys.exc_info()[1]
492 self.callback.handle_exception(self, e)
493
494
495 else:
496
497 while not self.do_quit:
498
499 try:
500
501 commands = self.slave_receive_commands()
502
503
504 if not isinstance(commands, list):
505 commands = [commands]
506
507
508 if self.batched_returns:
509 self.result_list = []
510 else:
511 self.result_list = None
512
513
514 for i, command in enumerate(commands):
515
516 self.stdio_capture()
517
518
519 completed = (i == len(commands)-1)
520
521
522 command.run(self, completed)
523
524
525 self.stdio_restore()
526
527
528 if self.batched_returns:
529 self.return_object(Batched_result_command(processor=self, result_commands=self.result_list, io_data=self.io_data))
530 self.result_list = None
531
532
533 except:
534 capturing_exception = Capturing_exception(rank=self.rank(), name=self.get_name())
535 exception_result = Result_exception(exception=capturing_exception, processor=self, completed=True)
536
537 self.return_object(exception_result)
538 self.result_list = None
539
540
541 self.post_run()
542
543
544 if self.on_master():
545 self.exit()
546
547
549 """Run the same command on all slave processors.
550
551 @see: multi.processor.processor.Slave_command.
552
553 @param command: A slave command.
554 @type command: Slave_command instance
555 """
556
557 queue = [command for i in range(self.processor_size())]
558 self.run_command_queue(queue)
559
560
611
612
614 """Run the processor queue - an abstract method.
615
616 All commands queued with add_to_queue will be executed, this function causes the current
617 thread to block until the command has completed.
618 """
619
620
621 lqueue = self.chunk_queue(self.command_queue)
622 self.run_command_queue(lqueue)
623
624 del self.command_queue[:]
625 self.memo_map.clear()
626
627
653
654
656 """Enable capture of the STDOUT and STDERR.
657
658 This is currently used to capture the IO streams of the slaves to return back to the master.
659 """
660
661
662 self.orig_stdout = sys.stdout
663 self.orig_stderr = sys.stderr
664
665
666 self.io_data = []
667
668
669 pre_strings = self.get_stdio_pre_strings()
670
671
672 sys.stdout = Redirect_text(self.io_data, token=pre_strings[0], stream=0)
673 sys.stderr = Redirect_text(self.io_data, token=pre_strings[1], stream=1)
674
675
677 """Restore the original STDOUT and STDERR streams."""
678
679
680 sys.stdout = self.orig_stdout
681 sys.stderr = self.orig_stderr
682