1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 """The processor class is the central class in the multi python multiprocessor framework.
26
27 Overview
28 ========
29
30 The framework has two main responsibilities:
31
32 1. Process management - if needed the processor can create the slave processes it manages if
33 they haven't been created by the operating system. It is also responsible for reporting
34 exceptions and shutting down the multiprocessor in the face of errors.
35 2. Scheduling commands on the slave processors via an interprocess communication fabric (MPI,
36 PVM, threads etc) and processing returned text and result commands.
37
38
39 Using the processor framework
40 =============================
41
42 Users of the processor framework will typically use the following methodology:
43
44 1. At application startup determine the name of the required processor implementation and the number of slave processors requested.
45
46 2. Create an Application_callback object. For example:
47 relax_instance = Relax()
48 callbacks = Application_callback(master=relax_instance)
49
50 3. Dynamically load a processor implementation using the name of the processor and the number of required slave processors. For example:
51 processor = Processor.load_multiprocessor(relax_instance.multiprocessor_type, callbacks, processor_size=relax_instance.n_processors)
52
53 4. Call run on the processor instance returned above and handle all Exceptions. For example:
54 processor.run()
55
56 5. After calling run, the processor will call back to Application_callback.init_master from which you should call you main program (Application_callback defaults to self.master.run()).
57
58 6. Once in the main program you should call processor.add_to_queue with a series of multi.Slave_command objects you wish to be run across the slave processor pool and then call processor.run_queue to actually execute the commands remotely while blocking.
59 >>>
60 example here...
61
62 7. Processor.Slave_commands will then run remotely on the slaves and any thrown exceptions and processor.result_commands queued to processor.return_object will be returned to the master processor and handled or executed. The slave processors also provide facilities for capturing the STDERR and STDOUT streams and returning their contents as strings for display on the master's STDOUT and STDERR streams (***more**?).
63
64
65 Extending the processor framework with a new interprocess communication fabric
66 ==============================================================================
67
68 The processor class acts as a base class that defines all the commands that a processor implementing
69 a new inter processor communication fabric needs. All that is required is to implement a subclass of
70 processor providing the required methods (of course as python provides dynamic typing and
71 polymorphism 'duck typing' you can always implement a class with the same set of method and it will
72 also work). Currently processor classes are loaded from the processor module and are modules with
73 names of the form:
74
75 >>> multi.<type>_processor.<Type>_processor
76
77 where <Type> is the name of the processor with the correct capitalisation e.g.
78
79 >>> processor_name = 'mpi4py'
80 >>> callback = My_application-callback()
81 >>> proccesor_size = 6
82 >>> processor.load_multiprocessor(processor_name, callback, processor_size)
83
84 will load multi.mpi4py_processor.Mpi4py_Processor.
85
86
87 TODO
88 ====
89
90 The following are yet to be implemented:
91
92 1. There is no ability of the processor to request command line arguments.
93
94 2. The processor can't currently be loaded from somewhere other than the multi directory.
95
96 """
97
98
99
100
101
102 import time, datetime, math, sys
103
104
105 from multi.misc import Capturing_exception, raise_unimplemented, Verbosity; verbosity = Verbosity()
106 from multi.result_queue import Threaded_result_queue
107 from multi.processor_io import Redirect_text
108 from multi.result_commands import Batched_result_command, Null_result_command, Result_exception
109 from multi.slave_commands import Slave_storage_command
110
111
113 """A special Processor specific data storage container."""
114
115
117 """The central class of the multi processor framework.
118
119 This provides facilities for process management, command queueing, command scheduling, remote
120 execution of commands, and handling of results and error from commands. The class is abstract
121 and should be overridden to implement new interprocess communication methods, however, even then
122 users are encouraged to override the more full implemented multi.multi_processor.Multi_processor
123 class. Most users should instantiate instances of this class by calling the static method
124 Processor.load_multiprocessor.
125
126 The class is designed to be subclassed and has abstract methods that a subclass needs to
127 override. Methods which can be overridden are clearly marked with a note annotation stating that
128 they can be overridden.
129
130 @todo: It maybe a good idea to separate out the features of the class that purely deal with the
131 interprocess communication fabric.
132 @todo: The processor can't currently harvest the required command line arguments from the
133 current command line.
134 """
135
136
137 - def __init__(self, processor_size, callback):
138 """Initialise the processor.
139
140 @param processor_size: The requested number of __slave__processors, if the number of
141 processors is set by the environment (e.g. in the case of MPI via
142 mpiexec -np <n-processors> on the command line the processor is free
143 free to ignore this value. The default value from the command line
144 is -1, and subclasses on receiving this value either raise and
145 exception or determine the correct number of slaves to create (e.g.
146 on a multi-cored machine using a threaded implementation the correct
147 number of slaves would be equal to the number of cores available).
148 @type processor_size: int
149 @param callback: The application callback which allows the host application to start
150 its main loop and handle exceptions from the processor.
151 @type callback: multi.processor.Application_callback instance
152 """
153
154 self.callback = callback
155 """Callback to interface to the host application
156
157 @see: Application_callback."""
158
159 self.grainyness = 1
160 """The number of sub jobs to queue for each processor if we have more jobs than processors."""
161
162
163
164
165
166
167
168
169
170 self.NULL_RESULT = Null_result_command(processor=self)
171 """Empty result command used by commands which do not return a result (a singleton?)."""
172
173
174 self.data_store = Data_store()
175 """The processor data store."""
176
177 self._processor_size = processor_size
178 """Number of slave processors available in this processor."""
179
180 self.threaded_result_processing = True
181 """Flag for the handling of result processing via self.run_command_queue()."""
182
183
185 """Shutdown the multi processor in exceptional conditions - designed for overriding.
186
187 This method is called after an exception from the master or slave has been raised and processed and is responsible for the shutdown of the multi processor fabric and terminating the application. The functions should be called as the last thing that Application_callback.handle_exception does.
188
189 As an example of the methods use see Mpi4py_processor.abort which calls MPI.COMM_WORLD.Abort() to cleanly shutdown the mpi framework and remove dangling processes.
190
191 The default action is to call the special self.exit() method.
192
193 @see: multi.processor.Application_callback.
194 @see: multi.mpi4py_processor.Mpi4py_processor.abort().
195 @see: mpi4py.MPI.COMM_WORLD.Abort().
196 """
197
198 self.exit()
199
200
202 """Add a command for remote execution to the queue - an abstract method.
203
204 @see: multi.processor.Slave_command
205 @see: multi.processor.Result_command
206 @see: multi.processor.Memo
207
208 @param command: A command to execute on a slave processor.
209 @type command: ? subclass instance
210 @keyword memo: A place to place data needed on command completion (e.g. where to save the
211 results) the data stored in the memo is provided to Result_commands
212 generated by the command submitted.
213 @type memo: Memo subclass instance
214 """
215
216 raise_unimplemented(self.add_to_queue)
217
218
220 """Make sure that this is the master processor and not a slave.
221
222 @raises Exception: If not on the master processor.
223 """
224
225 raise_unimplemented(self.assert_on_master)
226
227
228 - def exit(self, status=0):
229 """Exit the processor with the given status.
230
231 This default method allows the program to drop off the end and terminate as it normally would - i.e. this method does nothing.
232
233 @keyword status: The program exit status.
234 @type status: int
235 """
236
237
239 """Fetch the data structure of the given name from the data store.
240
241 This can be run on the master or slave processors.
242
243
244 @keyword name: The name of the data structure to fetch.
245 @type name: str
246 @return: The value of the associated data structure.
247 @rtype: anything
248 """
249
250
251 obj = getattr(self.data_store, name)
252
253
254 return obj
255
256
258 """Get a string describing the multi processor - designed for overriding.
259
260 The string should be suitable for display at application startup and should be less than 100
261 characters wide. A good example is the string returned by mpi4py_processor:
262
263 >>> MPI running via mpi4py with <n> slave processors & 1 master, mpi version = <x>.<y>
264
265 @see: multi.processor.mpi4py_processor.Mpi4py_processor.get_intro_string.
266
267 @return: A string describing the multi processor.
268 @rtype: str
269 """
270
271 raise_unimplemented(self.get_intro_string)
272
273
275 """Get the name of the current processor - an abstract method.
276
277 The string should identify the current master or slave processor uniquely but is purely for
278 information and debugging. For example the mpi implementation uses the string
279 <host-name>-<process-id> whereas the thread implementation uses the id of the current thread
280 as provided by python.
281
282 @return: The processor identifier.
283 @rtype: str
284 """
285
286 raise_unimplemented(self.get_name)
287
288
290 """Get the strings used prepend STDOUT and STDERR dependant on the current rank.
291
292 For processors with only one slave the result should be ('', '') - designed for overriding.
293
294 @note: The defaults are ('M S|', 'M E|') and ('NN S|' , 'NN E|') for masters and slaves
295 respectively with NN replaced by the rank of the processor.
296
297 @return: A list of two strings for prepending to each line of STDOUT and STDERR.
298 @rtype: list of 2 str
299 """
300
301
302 if not verbosity.level():
303 return '', ''
304
305
306 pre_string = ''
307 stdout_string = ''
308 stderr_string = ''
309 rank = self.rank()
310
311
312 if self.processor_size() > 1 and rank > 0:
313 pre_string = self.rank_format_string() % rank
314
315
316 elif self.processor_size() > 1 and rank == 0:
317 pre_string = 'M'*self.rank_format_string_width()
318
319
320 if self.processor_size() > 1:
321 stderr_string = pre_string + ' E| '
322 stdout_string = pre_string + ' | '
323
324
325 return stdout_string, stderr_string
326
327
329 """Utility function called to format the difference between application start and end times.
330
331 @todo: Check my format is correct.
332
333 @param start_time: The time the application started in seconds since the epoch.
334 @type start_time: float
335 @param end_time: The time the application ended in seconds since the epoch.
336 @type end_time: float
337 @return: The time difference in the format 'hours:minutes:seconds'.
338 @rtype: str
339 """
340
341 time_diff = end_time - start_time
342 time_delta = datetime.timedelta(seconds=time_diff)
343 time_delta_str = time_delta.__str__()
344 (time_delta_str, millis) = time_delta_str.split('.', 1)
345 return time_delta_str
346
347
349 """Slave to master processor data transfer - send the result command from the slave.
350
351 This is invoked by the slave processor.
352
353
354 @param command: The results command to send to the master.
355 @type command: Results_command instance
356 @param dest: The destination processor's rank.
357 @type dest: int
358 """
359
360 raise_unimplemented(self.master_queue_command)
361
362
364 """Slave to master processor data transfer - receive the result command from the slave.
365
366 This is invoked by the master processor.
367
368 @return: The result command sent by the slave.
369 @rtype: Result_command instance
370 """
371
372 raise_unimplemented(self.master_receive_result)
373
374
375 - def post_run(self):
376 """Method called after the application main loop has finished - designed for overriding.
377
378 The default implementation outputs the application runtime to STDOUT. All subclasses should
379 call the base method as their last action via super(). Only called on the master on normal
380 exit from the applications run loop.
381 """
382
383 if self.rank() == 0:
384 end_time = time.time()
385 time_delta_str = self.get_time_delta(self.start_time, end_time)
386
387
388 if verbosity.level():
389 print('\nOverall runtime: ' + time_delta_str + '\n')
390
391
393 """Method called before starting the application main loop - designed for overriding.
394
395 The default implementation just saves the start time for application timing. All subclasses
396 should call the base method via super(). Only called on the master.
397 """
398
399 if self.rank() == 0:
400 self.start_time = time.time()
401
402
404 """Get the number of slave processors - designed for overriding.
405
406 @return: The number of slave processors.
407 @rtype: int
408 """
409
410 return self._processor_size
411
412
414 """Get the rank of this processor - an abstract method.
415
416 The rank of the processor should be a number between 0 and n where n is the number of slave
417 processors, the rank of 0 is reserved for the master processor.
418
419 @return: The rank of the processor.
420 @rtype: int
421 """
422
423 raise_unimplemented(self.rank)
424
425
438
439
450
451
453 """Return a result to the master processor from a slave - an abstract method.
454
455 @param result: A result to be returned to the master processor.
456 @type result: Result_string, Result_command or Exception instance
457
458 @see: multi.processor.Result_string.
459 @see: multi.processor.Resulf_command.
460 """
461
462 raise_unimplemented(self.return_object)
463
464
466 """Run the processor - an abstract method.
467
468 This function runs the processor main loop and is called after all processor setup has been completed. It does remote execution setup and teardown (via self.pre_run() and self.post_run()) round either side of a call to Application_callback.init_master.
469
470 @see: multi.processor.Application_callback.
471 """
472
473
474 self.pre_run()
475
476
477 if self.on_master():
478
479 try:
480 self.callback.init_master(self)
481
482
483 except SystemExit:
484
485 self.exit()
486
487
488 raise
489
490
491 except Exception, e:
492 self.callback.handle_exception(self, e)
493
494
495 else:
496
497 while not self.do_quit:
498
499 try:
500
501 commands = self.slave_receive_commands()
502
503
504 if not isinstance(commands, list):
505 commands = [commands]
506
507
508 if self.batched_returns:
509 self.result_list = []
510 else:
511 self.result_list = None
512
513
514 for i, command in enumerate(commands):
515
516 self.stdio_capture()
517
518
519 completed = (i == len(commands)-1)
520
521
522 command.run(self, completed)
523
524
525 self.stdio_restore()
526
527
528 if self.batched_returns:
529 self.return_object(Batched_result_command(processor=self, result_commands=self.result_list, io_data=self.io_data))
530 self.result_list = None
531
532
533 except:
534 capturing_exception = Capturing_exception(rank=self.rank(), name=self.get_name())
535 exception_result = Result_exception(exception=capturing_exception, processor=self, completed=True)
536
537 self.return_object(exception_result)
538 self.result_list = None
539
540
541 self.post_run()
542
543
544 if self.on_master():
545 self.exit()
546
547
549 """Run the same command on all slave processors.
550
551 @see: multi.processor.processor.Slave_command.
552
553 @param command: A slave command.
554 @type command: Slave_command instance
555 """
556
557 queue = [command for i in range(self.processor_size())]
558 self.run_command_queue(queue)
559
560
611
612
614 """Run the processor queue - an abstract method.
615
616 All commands queued with add_to_queue will be executed, this function causes the current
617 thread to block until the command has completed.
618 """
619
620
621 lqueue = self.chunk_queue(self.command_queue)
622 self.run_command_queue(lqueue)
623
624 del self.command_queue[:]
625 self.memo_map.clear()
626
627
653
654
656 """Enable capture of the STDOUT and STDERR.
657
658 This is currently used to capture the IO streams of the slaves to return back to the master.
659 """
660
661
662 self.orig_stdout = sys.stdout
663 self.orig_stderr = sys.stderr
664
665
666 self.io_data = []
667
668
669 pre_strings = self.get_stdio_pre_strings()
670
671
672 sys.stdout = Redirect_text(self.io_data, token=pre_strings[0], stream=0)
673 sys.stderr = Redirect_text(self.io_data, token=pre_strings[1], stream=1)
674
675
677 """Restore the original STDOUT and STDERR streams."""
678
679
680 sys.stdout = self.orig_stdout
681 sys.stderr = self.orig_stderr
682