1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 from Queue import Queue
24 from Numeric import sum, zeros
25 from exceptions import Exception
26 from random import randint
27 from re import search
28 from string import ascii_letters
29 import sys
30 from time import sleep
31 from threading import Lock, Thread
32
33 from data import Element
34 from processes import RelaxPopen3
35
36
37
38
39
42 """Class containing functions for setting up and executing threading in relax."""
43
44 self.relax = relax
45
46
48 """Run relax in threading mode."""
49
50
51 execfile(self.relax.script_file)
52
53
54 - def read(self, file=None, dir=None):
55 """Function for reading a hosts file."""
56
57
58 file_data = self.relax.IO.extract_data(file, dir)
59
60
61 file_data = self.relax.IO.strip(file_data)
62
63
64 if not file_data:
65 raise RelaxFileEmptyError
66
67
68 self.host_data = []
69 for i in xrange(len(file_data)):
70
71 if len(file_data[i]) != 6:
72 raise RelaxError, "The number of columns in the hosts file line " + `file_data[i]` + " should be six."
73
74
75 host_name = file_data[i][0]
76 if host_name == '-':
77 host_name = 'localhost'
78
79
80 user = file_data[i][1]
81 if user == '-':
82 user = None
83
84
85 if host_name == 'localhost':
86 login = None
87 elif user:
88 login = user + '@' + host_name
89 else:
90 login = host_name
91
92
93 if host_name == 'localhost':
94 login_cmd = None
95 else:
96 login_cmd = 'ssh -o ForwardX11=no ' + login + ' '
97
98
99 prog_path = file_data[i][2]
100 if prog_path == '-':
101 prog_path = 'relax'
102
103
104 swd = file_data[i][3]
105 if swd == '-':
106 swd = '~/.relax'
107
108
109 priority = file_data[i][4]
110 if priority == '-':
111 priority = 15
112 try:
113 priority = int(priority)
114 except ValueError:
115 raise RelaxIntError, ('priority', priority)
116
117
118 num_cpus = file_data[i][5]
119 if num_cpus == '-':
120 num_cpus = 1
121 try:
122 num_cpus = int(num_cpus)
123 except ValueError:
124 raise RelaxIntError, ('CPUs', num_cpus)
125
126
127 for j in xrange(num_cpus):
128 self.host_data.append([host_name, user, login, login_cmd, prog_path, swd, priority])
129
130
131 RelaxHostParentThread(self.relax, self.host_data)
132
133
134 print "\nTotal number of active threads: " + `len(self.relax.thread_data.host_name)`
135
136
137
138
139
140
143 """Class containing all the thread data."""
144
145
146 self.status = 0
147 self.host_name = []
148 self.user = []
149 self.login = []
150 self.login_cmd = []
151 self.cp_cmd = []
152 self.prog_path = []
153 self.swd = []
154 self.priority = []
155
156
157
158
159
160
163 """Parent class containing the main threading loop."""
164
165
166 self.num_threads = len(self.relax.thread_data.host_name)
167
168
169 - def run(self, tag=1, save_state=1):
170 """Run the main threading loop."""
171
172
173 if tag:
174 print "Generating a random tag:"
175 self.tag = ''
176 for i in xrange(8):
177 index = randint(0, len(ascii_letters)-1)
178 self.tag = self.tag + ascii_letters[index]
179 print " %s\n" % self.tag
180 print "All files will be placed in the directory '%s' within the thread's working directory.\n" % self.tag
181
182
183 if save_state:
184 self.save_state_file = 'save_%s.gz' % self.tag
185 print "Saving the program state to send to the threads."
186 self.relax.generic.state.save(file=self.save_state_file, force=1, compress_type=2)
187
188
189 self.job_queue = Queue()
190 self.results_queue = Queue()
191
192
193 for i in xrange(self.num_jobs):
194 self.job_queue.put(i)
195
196
197 self.finished_jobs = zeros(self.num_jobs)
198
199
200 self.job_locks = []
201 for i in xrange(self.num_jobs):
202 self.job_locks.append(Lock())
203
204
205 RelaxResultsThread(self.job_queue, self.results_queue, self.finished_jobs, self.job_locks).start()
206
207
208 print "\nStarting all threads.\n"
209 self.threads = []
210 for i in xrange(self.num_threads):
211
212 self.threads.append(self.thread_object(i))
213
214
215 self.threads[i].start()
216
217
218 self.threads[i].setName("Thread-" + `i`)
219
220
221 try:
222 while 1:
223
224 sleep(0.2)
225
226
227 if sum(self.finished_jobs) == self.num_jobs:
228
229 self.job_queue.put(None)
230
231
232 self.results_queue.put(None)
233
234
235 break
236
237
238 except KeyboardInterrupt:
239
240 for i in xrange(self.num_jobs):
241
242 self.finished_jobs[i] == 1
243
244
245 if self.job_locks[i].locked():
246 self.job_locks[i].release()
247
248
249 self.thread_clean_up(print_flag=1)
250
251
252 raise KeyboardInterrupt
253
254
255 self.thread_clean_up()
256
257
259 """Function for cleaning up the threads."""
260
261
262 if print_flag:
263 print "Cleaning up threads."
264
265
266 self.results_queue.put(None)
267
268
269 self.job_queue.put(None)
270
271
272 kill_threads = []
273 for i in xrange(self.num_threads):
274
275 kill_threads.append(RelaxKillThread(self.threads[i], print_flag))
276
277
278 kill_threads[i].start()
279
280
281 while 1:
282
283 num_killed = 0
284 for i in xrange(self.num_threads):
285 if not kill_threads[i].isAlive():
286 num_killed = num_killed + 1
287
288
289 if num_killed == self.num_threads:
290 break
291
292
293 if hasattr(self, 'save_state_file'):
294 self.relax.IO.delete(file_name=self.save_state_file)
295
296
297
298
299
300
302 - def __init__(self, i, job_queue, results_queue, finished_jobs, job_locks):
303 """The base class of all threads in relax."""
304
305
306 Thread.__init__(self)
307
308
309 self.setDaemon(1)
310
311
312 self.job_queue = job_queue
313 self.results_queue = results_queue
314 self.finished_jobs = finished_jobs
315 self.job_locks = job_locks
316
317
318 self.host_name = self.relax.thread_data.host_name[i]
319 self.user = self.relax.thread_data.user[i]
320 self.login = self.relax.thread_data.login[i]
321 self.login_cmd = self.relax.thread_data.login_cmd[i]
322 self.prog_path = self.relax.thread_data.prog_path[i]
323 self.swd = self.relax.thread_data.swd[i]
324 self.priority = self.relax.thread_data.priority[i]
325
326
327 self.spawn_relax_flag = 1
328
329
330 self.kill_flag = 0
331
332
333 if not self.test_dir():
334 self.mkdir()
335
336
337 self.save_state_file = "%s/%s/save.gz" % (self.swd, self.tag)
338
339
340 if not self.test_save_file():
341 self.copy_save_file()
342
343
345 """Close all the stdin, stdout, and stderr pipes of the child (to flush the buffers)."""
346
347
348 if not self.child.tochild.closed:
349 self.child.tochild.close()
350
351
352 if not self.child.fromchild.closed:
353 self.child.fromchild.close()
354
355
356 if not self.child.childerr.closed:
357 self.child.childerr.close()
358
359
361 """Function for the once off copying of the temporary results file to the thread's wd."""
362
363
364 if self.host_name == 'localhost':
365 cmd = "cp -p save_%s.gz %s/%s/save.gz" % (self.tag, self.swd, self.tag)
366 else:
367 cmd = "scp -p save_%s.gz %s:%s/%s/save.gz" % (self.tag, self.login, self.swd, self.tag)
368 err = self.start_child(cmd=cmd, remote_exe=0, catch_err=1)
369
370
371 if len(err):
372 raise RelaxError, "The copy command `%s` could not be executed." % cmd
373
374
376 """Function for running an instance of relax in threading mode on the host machine."""
377
378
379 cmd = "nice -n %s %s --thread --log %s %s" % (self.priority, self.prog_path, self.log_file, self.script_file)
380 self.start_child(cmd=cmd, close=0)
381
382
383 self.child.relax_pid = self.child.fromchild.readline()
384 self.child.relax_pid = int(self.child.relax_pid[0:-1])
385
386
387 self.results = self.child.fromchild.readlines()
388
389
390 self.child.tochild.close()
391 self.child.fromchild.close()
392
393
394 err = self.child.childerr.readlines()
395 if len(err):
396 for line in err:
397 print line[0:-1]
398
399
400 self.child.childerr.close()
401
402
404 """Function for creating the directory 'self.tag' in the working directory."""
405
406
407 cmd = "mkdir %s/%s" % (self.swd, self.tag)
408 err = self.start_child(cmd=cmd, catch_err=1)
409
410
411 if len(err):
412 raise RelaxError, "The directory `%s/%s` could not be created on %s." % (self.swd, self.tag, self.host_name)
413
414
423
424
426 """Return the string required for either local or remote execution of the command."""
427
428
429 if login_cmd:
430 return login_cmd + " \"" + cmd + "\""
431
432
433 return cmd
434
435
436 - def run(self, expanded_flag=1):
437 """Main function for execution of the specific threading code."""
438
439
440 while 1:
441
442 if self.kill_flag:
443 break
444
445
446 self.job_number = self.job_queue.get()
447
448
449 if self.job_number == None:
450
451 self.job_queue.put(None)
452
453
454 break
455
456
457 if self.finished_jobs[self.job_number] == 1:
458 continue
459
460
461 self.job_queue.put(self.job_number)
462
463
464 try:
465
466 if self.spawn_relax_flag:
467
468 self.script_file = "%s/%s/script_%s_job_%s.py" % (self.swd, self.tag, self.getName(), self.job_number)
469 self.log_file = "%s/%s/%s_job_%s.log" % (self.swd, self.tag, self.getName(), self.job_number)
470
471
472 self.thread_run = 'job_%s' % self.job_number
473
474
475 self.pre_locked_code()
476
477
478 self.job_locks[self.job_number].acquire()
479
480
481 if self.finished_jobs[self.job_number] == 1:
482
483 self.job_locks[self.job_number].release()
484
485
486 continue
487
488
489 if self.kill_flag:
490 break
491
492
493 self.post_locked_code()
494
495
496 except:
497
498 if self.kill_flag:
499 break
500
501
502 if Debug:
503 raise
504
505
506 print "%s, job %s on %s: failed. Thread sleeping for 5 minutes." % (self.getName(), self.job_number, self.host_name)
507
508
509 sleep(300)
510
511
512 continue
513
514
515 self.results_queue.put(self.job_number)
516
517
518 - def start_child(self, cmd, catch_out=0, catch_err=0, remote_exe=1, close=1):
519 """Start the child process and place it in 'self.child'."""
520
521
522 text = None
523
524
525 if catch_out and catch_err:
526 raise RelaxError, "Cannot catch both stdout and stderr simultaneously, this causes racing."
527
528
529 if remote_exe:
530 cmd = self.remote_command(cmd=cmd, login_cmd=self.login_cmd)
531
532
533 self.child = RelaxPopen3(cmd, capturestderr=1)
534
535
536 if catch_out:
537 text = self.child.fromchild.readlines()
538
539
540 if catch_err:
541 text = self.child.childerr.readlines()
542
543
544 if close:
545 self.close_all_pipes()
546
547
548 return text
549
550
551 - def kill(self, print_flag=0):
552 """Attempt to kill the thread."""
553
554
555 self.kill_flag = 1
556
557
558 if hasattr(self, 'child'):
559 self.child.kill(login_cmd=self.login_cmd)
560
561
562 if hasattr(self, 'job_number') and self.job_number != None:
563
564 self.finished_jobs[self.job_number] = 1
565
566
567 if self.job_locks[self.job_number].locked():
568 self.job_locks[self.job_number].release()
569
570
571 if print_flag:
572 print "%s, job %s on %s terminated." % (self.getName(), self.job_number, self.host_name)
573
574
576 """Function for testing if the directory corresponding to tag exists."""
577
578
579 cmd = "ls %s/%s" % (self.swd, self.tag)
580 err = self.start_child(cmd=cmd, catch_err=1)
581
582
583 if len(err):
584 return 0
585
586
587 else:
588 return 1
589
590
592 """Function for testing if results file is already copied."""
593
594
595 cmd = "ls %s" % self.save_state_file
596 err = self.start_child(cmd=cmd, catch_err=1)
597
598
599 if len(err):
600 return 0
601
602
603 else:
604 return 1
605
606
607
608
609
610
611
613 - def __init__(self, job_queue, results_queue, finished_jobs, job_locks):
614 """The thread for collecting results."""
615
616
617 Thread.__init__(self)
618
619
620 self.setDaemon(1)
621
622
623 self.job_queue = job_queue
624 self.results_queue = results_queue
625 self.finished_jobs = finished_jobs
626 self.job_locks = job_locks
627
628
630 """Main function for execution of the specific threading code."""
631
632
633 while 1:
634
635 job_number = self.results_queue.get()
636
637
638 if job_number == None:
639 break
640
641
642 self.finished_jobs[job_number] = 1
643
644
645 if self.job_locks[job_number].locked():
646 self.job_locks[job_number].release()
647
648
649
650
651
652
653
655 - def __init__(self, thread, print_flag):
656 """The thread for collecting results."""
657
658
659 Thread.__init__(self)
660
661
662 self.setDaemon(1)
663
664
665 self.thread = thread
666 self.print_flag = print_flag
667
668
670 """Main function for execution of the specific threading code."""
671
672
673 self.thread.kill(self.print_flag)
674
675
676
677
678
679
680
683 """Threaded threading setup (for faster ssh responses)."""
684
685
686 self.relax = relax
687 self.host_data = host_data
688
689
690 self.num_threads = self.num_jobs = len(self.host_data)
691
692
693 self.run(tag=0, save_state=0)
694
695
697 """Function for returning an initialised thread object."""
698
699
700 return RelaxHostThread(self.relax, self.job_queue, self.results_queue, self.finished_jobs, self.job_locks, self.host_data)
701
702
703
704
705
706
708 - def __init__(self, relax, job_queue, results_queue, finished_jobs, job_locks, host_data):
709 """Initialisation of the thread."""
710
711
712 Thread.__init__(self)
713
714
715 self.relax = relax
716 self.job_queue = job_queue
717 self.results_queue = results_queue
718 self.finished_jobs = finished_jobs
719 self.job_locks = job_locks
720 self.host_data = host_data
721
722
723 self.spawn_relax_flag = 0
724
725
726 self.kill_flag = 0
727
728
730 """Code to run prior to locking the job."""
731
732
733 self.host_name, self.user, self.login, self.login_cmd, self.prog_path, self.swd, self.priority = self.host_data[self.job_number]
734
735
736 self.fail = 0
737
738
739 self.text = []
740 self.text.append("%-20s%-10s" % ("Host name:", self.host_name))
741 self.text.append("%-20s%-10s" % ("User name:", self.user))
742 self.text.append("%-20s%-10s" % ("Program path:", self.prog_path))
743 self.text.append("%-20s%-10s" % ("Working directory:", self.swd))
744 self.text.append("%-20s%-10i" % ("Priority:", self.priority))
745
746
747 if self.host_name != 'localhost' and not self.fail and not self.test_ssh():
748 self.fail = 1
749
750
751 if not self.fail and not self.test_wd():
752 self.fail = 1
753
754
755 if not self.fail and not self.test_relax():
756 self.fail = 1
757
758
759 if not self.fail:
760 self.text.append("%-20s%-10s" % ("Host status:", "[ OK ]"))
761
762
764 """Code to run after locking the job."""
765
766
767 for line in self.text:
768 print line
769 print "\n"
770
771
772 if not self.fail:
773
774 if not self.relax.thread_data.status:
775 self.relax.thread_data.status = 1
776
777
778 self.relax.thread_data.host_name.append(self.host_name)
779 self.relax.thread_data.user.append(self.user)
780 self.relax.thread_data.login.append(self.login)
781 self.relax.thread_data.login_cmd.append(self.login_cmd)
782 self.relax.thread_data.prog_path.append(self.prog_path)
783 self.relax.thread_data.swd.append(self.swd)
784 self.relax.thread_data.priority.append(self.priority)
785
786
788 """Function for testing if the program path is valid and that relax can execute."""
789
790
791 cmd = "%s --test" % self.prog_path
792 err = self.start_child(cmd=cmd, catch_err=1)
793
794
795 if len(err):
796
797 self.text.append("Cannot execute relax on %s using the program path %s" % (self.login, `self.prog_path`))
798 for line in err:
799 self.text.append(line[0:-1])
800
801
802 return 0
803
804
805 else:
806 return 1
807
808
810 """Function for testing the SSH connection and public key authentication."""
811
812
813 cmd = "ssh -o PasswordAuthentication=no -o ForwardX11=no %s \"echo 'relax> ssh ok'\"" % self.login
814 self.start_child(cmd=cmd, remote_exe=0, close=0)
815
816
817 out = self.child.fromchild.readlines()
818 for line in out:
819 if search('relax> ssh ok', line):
820 return 1
821
822
823 err = self.child.childerr.readlines()
824 if len(err):
825
826 self.text.append("Cannot establish a SSH connection to %s." % self.login)
827
828
829 key_auth = 1
830 for line in err:
831 if search('Permission denied', line):
832 key_auth = 0
833 if not key_auth:
834 self.text.append("Public key authenication failed.")
835
836
837 for line in err:
838 self.text.append(line[0:-1])
839
840
842 """Function for testing if the working directory on the host machine exist."""
843
844
845 cmd = "if test -d %s; then echo 'OK'; fi" % self.swd
846 out = self.start_child(cmd=cmd, catch_out=1)
847
848
849 for line in out:
850 if search('OK', line):
851 return 1
852
853
854 self.text.append("Cannot find the working directory %s on %s." % (self.swd, self.host_name))
855 return 0
856