Module thread_classes
[hide private]
[frames] | no frames]

Source Code for Module thread_classes

  1  ############################################################################### 
  2  #                                                                             # 
  3  # Copyright (C) 2004 Edward d'Auvergne                                        # 
  4  #                                                                             # 
  5  # This file is part of the program relax.                                     # 
  6  #                                                                             # 
  7  # relax is free software; you can redistribute it and/or modify               # 
  8  # it under the terms of the GNU General Public License as published by        # 
  9  # the Free Software Foundation; either version 2 of the License, or           # 
 10  # (at your option) any later version.                                         # 
 11  #                                                                             # 
 12  # relax is distributed in the hope that it will be useful,                    # 
 13  # but WITHOUT ANY WARRANTY; without even the implied warranty of              # 
 14  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               # 
 15  # GNU General Public License for more details.                                # 
 16  #                                                                             # 
 17  # You should have received a copy of the GNU General Public License           # 
 18  # along with relax; if not, write to the Free Software                        # 
 19  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA   # 
 20  #                                                                             # 
 21  ############################################################################### 
 22   
 23  from Queue import Queue 
 24  from Numeric import sum, zeros 
 25  from exceptions import Exception 
 26  from random import randint 
 27  from re import search 
 28  from string import ascii_letters 
 29  import sys 
 30  from time import sleep 
 31  from threading import Lock, Thread 
 32   
 33  from data import Element 
 34  #from processes import RelaxPopen3 
 35   
 36   
 37  # Class for setting up threading. 
 38  ################################# 
 39   
40 -class Threading:
41 - def __init__(self, relax):
42 """Class containing functions for setting up and executing threading in relax.""" 43 44 self.relax = relax
45 46
47 - def execute(self):
48 """Run relax in threading mode.""" 49 50 # Execute the script. 51 execfile(self.relax.script_file)
52 53
54 - def read(self, file=None, dir=None):
55 """Function for reading a hosts file.""" 56 57 # Extract the data from the file. 58 file_data = self.relax.IO.extract_data(file, dir) 59 60 # Strip data. 61 file_data = self.relax.IO.strip(file_data) 62 63 # Do nothing if the file does not exist. 64 if not file_data: 65 raise RelaxFileEmptyError 66 67 # Loop over the hosts. 68 self.host_data = [] 69 for i in xrange(len(file_data)): 70 # Test to see if the correct number of columns exist. 71 if len(file_data[i]) != 6: 72 raise RelaxError, "The number of columns in the hosts file line " + `file_data[i]` + " should be six." 73 74 # Host name. 75 host_name = file_data[i][0] 76 if host_name == '-': 77 host_name = 'localhost' 78 79 # User name. 80 user = file_data[i][1] 81 if user == '-': 82 user = None 83 84 # Host login 85 if host_name == 'localhost': 86 login = None 87 elif user: 88 login = user + '@' + host_name 89 else: 90 login = host_name 91 92 # Login command. 93 if host_name == 'localhost': 94 login_cmd = None 95 else: 96 login_cmd = 'ssh -o ForwardX11=no ' + login + ' ' 97 98 # Program path. 99 prog_path = file_data[i][2] 100 if prog_path == '-': 101 prog_path = 'relax' 102 103 # Working directory. 104 swd = file_data[i][3] 105 if swd == '-': 106 swd = '~/.relax' 107 108 # Priority. 109 priority = file_data[i][4] 110 if priority == '-': 111 priority = 15 112 try: 113 priority = int(priority) 114 except ValueError: 115 raise RelaxIntError, ('priority', priority) 116 117 # Number of CPUs. 118 num_cpus = file_data[i][5] 119 if num_cpus == '-': 120 num_cpus = 1 121 try: 122 num_cpus = int(num_cpus) 123 except ValueError: 124 raise RelaxIntError, ('CPUs', num_cpus) 125 126 # Update the host data structure. 127 for j in xrange(num_cpus): 128 self.host_data.append([host_name, user, login, login_cmd, prog_path, swd, priority]) 129 130 # Threaded threading setup (for faster ssh responses). 131 RelaxHostParentThread(self.relax, self.host_data) 132 133 # Final print out. 134 print "\nTotal number of active threads: " + `len(self.relax.thread_data.host_name)`
135 136 137 138 # Class containing the thread data. 139 ################################### 140
141 -class ThreadData(Element):
142 - def __init__(self):
143 """Class containing all the thread data.""" 144 145 # Host data for threading. 146 self.status = 0 147 self.host_name = [] 148 self.user = [] 149 self.login = [] 150 self.login_cmd = [] 151 self.cp_cmd = [] 152 self.prog_path = [] 153 self.swd = [] 154 self.priority = []
155 156 157 158 # The parent class containing the main threading loop. 159 ####################################################### 160
161 -class RelaxParentThread:
162 - def __init__(self):
163 """Parent class containing the main threading loop.""" 164 165 # The number of threads. 166 self.num_threads = len(self.relax.thread_data.host_name)
167 168
169 - def run(self, tag=1, save_state=1):
170 """Run the main threading loop.""" 171 172 # Generate a random string tag to add to all thread files. 173 if tag: 174 print "Generating a random tag:" 175 self.tag = '' 176 for i in xrange(8): 177 index = randint(0, len(ascii_letters)-1) 178 self.tag = self.tag + ascii_letters[index] 179 print " %s\n" % self.tag 180 print "All files will be placed in the directory '%s' within the thread's working directory.\n" % self.tag 181 182 # Save the program state. 183 if save_state: 184 self.save_state_file = 'save_%s.gz' % self.tag 185 print "Saving the program state to send to the threads." 186 self.relax.generic.state.save(file=self.save_state_file, force=1, compress_type=2) 187 188 # Initialise the job and results queues. 189 self.job_queue = Queue() 190 self.results_queue = Queue() 191 192 # Fill the job queue with the job indecies. 193 for i in xrange(self.num_jobs): 194 self.job_queue.put(i) 195 196 # Initialise an array of finished jobs. 197 self.finished_jobs = zeros(self.num_jobs) 198 199 # Initialise an array of locks for the jobs. 200 self.job_locks = [] 201 for i in xrange(self.num_jobs): 202 self.job_locks.append(Lock()) 203 204 # Start the results thread. 205 RelaxResultsThread(self.job_queue, self.results_queue, self.finished_jobs, self.job_locks).start() 206 207 # Start all sub-threads. 208 print "\nStarting all threads.\n" 209 self.threads = [] 210 for i in xrange(self.num_threads): 211 # Instantiate and place the thread in 'self.threads'. 212 self.threads.append(self.thread_object(i)) 213 214 # Start the thread. 215 self.threads[i].start() 216 217 # Name the thread. 218 self.threads[i].setName("Thread-" + `i`) 219 220 # The main loop. 221 try: 222 while 1: 223 # Sleep for 0.2 seconds. 224 sleep(0.2) 225 226 # All jobs have finished. 227 if sum(self.finished_jobs) == self.num_jobs: 228 # Add None to the job_queue to signal the threads to finish. 229 self.job_queue.put(None) 230 231 # Add None to the results_queue to signal to the results thread to finish. 232 self.results_queue.put(None) 233 234 # Break the main loop. 235 break 236 237 # Catch the keyboard interrupt signal. 238 except KeyboardInterrupt: 239 # Loop over all jobs. 240 for i in xrange(self.num_jobs): 241 # Tell the threads that all jobs have finished. 242 self.finished_jobs[i] == 1 243 244 # Release the job locks. 245 if self.job_locks[i].locked(): 246 self.job_locks[i].release() 247 248 # Thread clean up function. 249 self.thread_clean_up(print_flag=1) 250 251 # Reraise the keyboard interrupt signal. 252 raise KeyboardInterrupt 253 254 # Thread clean up function. 255 self.thread_clean_up()
256 257
258 - def thread_clean_up(self, print_flag=0):
259 """Function for cleaning up the threads.""" 260 261 # Print out. 262 if print_flag: 263 print "Cleaning up threads." 264 265 # Place None onto the results queue to terminate the results thread. 266 self.results_queue.put(None) 267 268 # Place None onto the jobs queue to terminate the threads. 269 self.job_queue.put(None) 270 271 # Kill all threads (as threads to serialise and speed up the kill process). 272 kill_threads = [] 273 for i in xrange(self.num_threads): 274 # Initialise the thread. 275 kill_threads.append(RelaxKillThread(self.threads[i], print_flag)) 276 277 # Start all the threads. 278 kill_threads[i].start() 279 280 # Hang until finished. 281 while 1: 282 # Count the number of finished kill threads. 283 num_killed = 0 284 for i in xrange(self.num_threads): 285 if not kill_threads[i].isAlive(): 286 num_killed = num_killed + 1 287 288 # Exit. 289 if num_killed == self.num_threads: 290 break 291 292 # Delete the saved state file. 293 if hasattr(self, 'save_state_file'): 294 self.relax.IO.delete(file_name=self.save_state_file)
295 296 297 298 # The relax threading base class. 299 ################################# 300
301 -class RelaxThread(Thread):
302 - def __init__(self, i, job_queue, results_queue, finished_jobs, job_locks):
303 """The base class of all threads in relax.""" 304 305 # Run the Thread __init__ function (this is 'asserted' by the Thread class). 306 Thread.__init__(self) 307 308 # Set as a daemon. 309 self.setDaemon(1) 310 311 # Arguements. 312 self.job_queue = job_queue 313 self.results_queue = results_queue 314 self.finished_jobs = finished_jobs 315 self.job_locks = job_locks 316 317 # Specific thread details. 318 self.host_name = self.relax.thread_data.host_name[i] 319 self.user = self.relax.thread_data.user[i] 320 self.login = self.relax.thread_data.login[i] 321 self.login_cmd = self.relax.thread_data.login_cmd[i] 322 self.prog_path = self.relax.thread_data.prog_path[i] 323 self.swd = self.relax.thread_data.swd[i] 324 self.priority = self.relax.thread_data.priority[i] 325 326 # Flag for when relax will be spawned on the host machine. 327 self.spawn_relax_flag = 1 328 329 # Kill flag. 330 self.kill_flag = 0 331 332 # Make the directory with the name of tag in the thread's working directory if it doesn't exist. 333 if not self.test_dir(): 334 self.mkdir() 335 336 # Save state file. 337 self.save_state_file = "%s/%s/save.gz" % (self.swd, self.tag) 338 339 # Copy the temporary results file to the thread's working directory once during initialisation. 340 if not self.test_save_file(): 341 self.copy_save_file()
342 343
344 - def close_all_pipes(self):
345 """Close all the stdin, stdout, and stderr pipes of the child (to flush the buffers).""" 346 347 # Close stdin. 348 if not self.child.tochild.closed: 349 self.child.tochild.close() 350 351 # Close stdout. 352 if not self.child.fromchild.closed: 353 self.child.fromchild.close() 354 355 # Close stderr. 356 if not self.child.childerr.closed: 357 self.child.childerr.close()
358 359
360 - def copy_save_file(self):
361 """Function for the once off copying of the temporary results file to the thread's wd.""" 362 363 # Copy command. 364 if self.host_name == 'localhost': 365 cmd = "cp -p save_%s.gz %s/%s/save.gz" % (self.tag, self.swd, self.tag) 366 else: 367 cmd = "scp -p save_%s.gz %s:%s/%s/save.gz" % (self.tag, self.login, self.swd, self.tag) 368 err = self.start_child(cmd=cmd, remote_exe=0, catch_err=1) 369 370 # The file could not be copied. 371 if len(err): 372 raise RelaxError, "The copy command `%s` could not be executed." % cmd
373 374
375 - def exec_relax(self):
376 """Function for running an instance of relax in threading mode on the host machine.""" 377 378 # Command. 379 cmd = "nice -n %s %s --thread --log %s %s" % (self.priority, self.prog_path, self.log_file, self.script_file) 380 self.start_child(cmd=cmd, close=0) 381 382 # Catch the PID. 383 self.child.relax_pid = self.child.fromchild.readline() 384 self.child.relax_pid = int(self.child.relax_pid[0:-1]) 385 386 # Catch the results. 387 self.results = self.child.fromchild.readlines() 388 389 # Close all pipes. 390 self.child.tochild.close() 391 self.child.fromchild.close() 392 393 # Errors. 394 err = self.child.childerr.readlines() 395 if len(err): 396 for line in err: 397 print line[0:-1] 398 399 # Close the error pipe. 400 self.child.childerr.close()
401 402
403 - def mkdir(self):
404 """Function for creating the directory 'self.tag' in the working directory.""" 405 406 # Command for creating the directory. 407 cmd = "mkdir %s/%s" % (self.swd, self.tag) 408 err = self.start_child(cmd=cmd, catch_err=1) 409 410 # Cannot make the directory. 411 if len(err): 412 raise RelaxError, "The directory `%s/%s` could not be created on %s." % (self.swd, self.tag, self.host_name)
413 414
415 - def pre_locked_code(self):
416 """Generic function for the pre-locked code.""" 417 418 # Generate the script file. 419 self.generate_script() 420 421 # Execute relax and run the script. 422 self.exec_relax()
423 424
425 - def remote_command(self, cmd, login_cmd):
426 """Return the string required for either local or remote execution of the command.""" 427 428 # Remote execution. 429 if login_cmd: 430 return login_cmd + " \"" + cmd + "\"" 431 432 # Local execution. 433 return cmd
434 435
436 - def run(self, expanded_flag=1):
437 """Main function for execution of the specific threading code.""" 438 439 # Run until all results are returned. 440 while 1: 441 # Catch the kill signal. 442 if self.kill_flag: 443 break 444 445 # Get the job number for the next queued job. 446 self.job_number = self.job_queue.get() 447 448 # Quit if the job number is None, this is the signal for when all jobs have been completed. 449 if self.job_number == None: 450 # Place None back into the job queue so that all the other waiting threads will terminate. 451 self.job_queue.put(None) 452 453 # Thread termination (breaking of the while loop). 454 break 455 456 # Job termination if finished by a faster thread. 457 if self.finished_jobs[self.job_number] == 1: 458 continue 459 460 # Place the job back into the job queue. This is to make the threads fail safe and so that idle faster threads will pick up the jobs of the slower threads. 461 self.job_queue.put(self.job_number) 462 463 # Catch all exceptions in the specific code. 464 try: 465 # Run all the code. 466 if self.spawn_relax_flag: 467 # Script and log files. 468 self.script_file = "%s/%s/script_%s_job_%s.py" % (self.swd, self.tag, self.getName(), self.job_number) 469 self.log_file = "%s/%s/%s_job_%s.log" % (self.swd, self.tag, self.getName(), self.job_number) 470 471 # Thread run name. 472 self.thread_run = 'job_%s' % self.job_number 473 474 # Run the specific code prior to locking the job. 475 self.pre_locked_code() 476 477 # Lock the job. 478 self.job_locks[self.job_number].acquire() 479 480 # Job termination if finished by a faster thread. 481 if self.finished_jobs[self.job_number] == 1: 482 # Release the lock. 483 self.job_locks[self.job_number].release() 484 485 # Job termination without running the post locked code or placing the job number in the results queue. 486 continue 487 488 # Catch the kill signal. 489 if self.kill_flag: 490 break 491 492 # Run the specific code after locking the job. 493 self.post_locked_code() 494 495 # All other errors. 496 except: 497 # Catch a kill. 498 if self.kill_flag: 499 break 500 501 # Print the exception if in debugging mode. 502 if Debug: 503 raise 504 505 # Print out. 506 print "%s, job %s on %s: failed. Thread sleeping for 5 minutes." % (self.getName(), self.job_number, self.host_name) 507 508 # Sleep for 5 min. 509 sleep(300) 510 511 # Skip to the next job. 512 continue 513 514 # Place the job number into the results queue. 515 self.results_queue.put(self.job_number)
516 517
518 - def start_child(self, cmd, catch_out=0, catch_err=0, remote_exe=1, close=1):
519 """Start the child process and place it in 'self.child'.""" 520 521 # Initialise text. 522 text = None 523 524 # Disallow racing. 525 if catch_out and catch_err: 526 raise RelaxError, "Cannot catch both stdout and stderr simultaneously, this causes racing." 527 528 # Modify the command for remote execution if necessary. 529 if remote_exe: 530 cmd = self.remote_command(cmd=cmd, login_cmd=self.login_cmd) 531 532 # Open the pipes. 533 self.child = RelaxPopen3(cmd, capturestderr=1) 534 535 # Read the output. 536 if catch_out: 537 text = self.child.fromchild.readlines() 538 539 # Read the errors. 540 if catch_err: 541 text = self.child.childerr.readlines() 542 543 # Close all pipes. 544 if close: 545 self.close_all_pipes() 546 547 # Return the caught text. 548 return text
549 550
551 - def kill(self, print_flag=0):
552 """Attempt to kill the thread.""" 553 554 # Set the thread's kill flag. 555 self.kill_flag = 1 556 557 # Kill the child process. 558 if hasattr(self, 'child'): 559 self.child.kill(login_cmd=self.login_cmd) 560 561 # Finish active jobs. 562 if hasattr(self, 'job_number') and self.job_number != None: 563 # Set the job to finished. 564 self.finished_jobs[self.job_number] = 1 565 566 # Release the job lock. 567 if self.job_locks[self.job_number].locked(): 568 self.job_locks[self.job_number].release() 569 570 # Print out. 571 if print_flag: 572 print "%s, job %s on %s terminated." % (self.getName(), self.job_number, self.host_name)
573 574
575 - def test_dir(self):
576 """Function for testing if the directory corresponding to tag exists.""" 577 578 # Command for testing if directory exists. 579 cmd = "ls %s/%s" % (self.swd, self.tag) 580 err = self.start_child(cmd=cmd, catch_err=1) 581 582 # No directory. 583 if len(err): 584 return 0 585 586 # Directory exists. 587 else: 588 return 1
589 590
591 - def test_save_file(self):
592 """Function for testing if results file is already copied.""" 593 594 # Command for testing if results file is already copied. 595 cmd = "ls %s" % self.save_state_file 596 err = self.start_child(cmd=cmd, catch_err=1) 597 598 # No file. 599 if len(err): 600 return 0 601 602 # File exists. 603 else: 604 return 1
605 606 607 608 609 # The results thread. 610 ##################### 611
612 -class RelaxResultsThread(Thread):
613 - def __init__(self, job_queue, results_queue, finished_jobs, job_locks):
614 """The thread for collecting results.""" 615 616 # Run the Thread __init__ function (this is 'asserted' by the Thread class). 617 Thread.__init__(self) 618 619 # Set as a daemon. 620 self.setDaemon(1) 621 622 # Arguements. 623 self.job_queue = job_queue 624 self.results_queue = results_queue 625 self.finished_jobs = finished_jobs 626 self.job_locks = job_locks
627 628
629 - def run(self):
630 """Main function for execution of the specific threading code.""" 631 632 # Loop until all results are in. 633 while 1: 634 # Get the next results off the results_queue (hang here until a result is queued). 635 job_number = self.results_queue.get() 636 637 # Termination of the thread. 638 if job_number == None: 639 break 640 641 # Update the finished jobs. 642 self.finished_jobs[job_number] = 1 643 644 # Release the lock (finished jobs must be updated first). 645 if self.job_locks[job_number].locked(): 646 self.job_locks[job_number].release()
647 648 649 650 651 # The thread for killing a thread. 652 ################################## 653
654 -class RelaxKillThread(Thread):
655 - def __init__(self, thread, print_flag):
656 """The thread for collecting results.""" 657 658 # Run the Thread __init__ function (this is 'asserted' by the Thread class). 659 Thread.__init__(self) 660 661 # Set as a daemon. 662 self.setDaemon(1) 663 664 # Arguments. 665 self.thread = thread 666 self.print_flag = print_flag
667 668
669 - def run(self):
670 """Main function for execution of the specific threading code.""" 671 672 # Run the threads' kill code. 673 self.thread.kill(self.print_flag)
674 675 676 677 678 # Main threading loop for setting up threading. 679 ############################################### 680
681 -class RelaxHostParentThread(RelaxParentThread):
682 - def __init__(self, relax, host_data):
683 """Threaded threading setup (for faster ssh responses).""" 684 685 # Arguments. 686 self.relax = relax 687 self.host_data = host_data 688 689 # Number of threads and number of jobs. 690 self.num_threads = self.num_jobs = len(self.host_data) 691 692 # Run the main loop. 693 self.run(tag=0, save_state=0)
694 695
696 - def thread_object(self, i):
697 """Function for returning an initialised thread object.""" 698 699 # Return the thread object. 700 return RelaxHostThread(self.relax, self.job_queue, self.results_queue, self.finished_jobs, self.job_locks, self.host_data)
701 702 703 704 # Threads for setting up threading. 705 ################################### 706
707 -class RelaxHostThread(RelaxThread):
708 - def __init__(self, relax, job_queue, results_queue, finished_jobs, job_locks, host_data):
709 """Initialisation of the thread.""" 710 711 # Run the Thread __init__ function (this is 'asserted' by the Thread class). 712 Thread.__init__(self) 713 714 # Arguements. 715 self.relax = relax 716 self.job_queue = job_queue 717 self.results_queue = results_queue 718 self.finished_jobs = finished_jobs 719 self.job_locks = job_locks 720 self.host_data = host_data 721 722 # Relax will not be spawned on the host machine (except for testing). 723 self.spawn_relax_flag = 0 724 725 # Kill flag. 726 self.kill_flag = 0
727 728
729 - def pre_locked_code(self):
730 """Code to run prior to locking the job.""" 731 732 # Expand the data structures. 733 self.host_name, self.user, self.login, self.login_cmd, self.prog_path, self.swd, self.priority = self.host_data[self.job_number] 734 735 # Host failure flag. 736 self.fail = 0 737 738 # Text. 739 self.text = [] 740 self.text.append("%-20s%-10s" % ("Host name:", self.host_name)) 741 self.text.append("%-20s%-10s" % ("User name:", self.user)) 742 self.text.append("%-20s%-10s" % ("Program path:", self.prog_path)) 743 self.text.append("%-20s%-10s" % ("Working directory:", self.swd)) 744 self.text.append("%-20s%-10i" % ("Priority:", self.priority)) 745 746 # Test the SSH connection. 747 if self.host_name != 'localhost' and not self.fail and not self.test_ssh(): 748 self.fail = 1 749 750 # Test the working directory. 751 if not self.fail and not self.test_wd(): 752 self.fail = 1 753 754 # Test if relax works. 755 if not self.fail and not self.test_relax(): 756 self.fail = 1 757 758 # Host is accessible. 759 if not self.fail: 760 self.text.append("%-20s%-10s" % ("Host status:", "[ OK ]"))
761 762
763 - def post_locked_code(self):
764 """Code to run after locking the job.""" 765 766 # Print the results. 767 for line in self.text: 768 print line 769 print "\n" 770 771 # Add all good hosts to self.relax.thread_data 772 if not self.fail: 773 # Status. 774 if not self.relax.thread_data.status: 775 self.relax.thread_data.status = 1 776 777 # Store the details. 778 self.relax.thread_data.host_name.append(self.host_name) 779 self.relax.thread_data.user.append(self.user) 780 self.relax.thread_data.login.append(self.login) 781 self.relax.thread_data.login_cmd.append(self.login_cmd) 782 self.relax.thread_data.prog_path.append(self.prog_path) 783 self.relax.thread_data.swd.append(self.swd) 784 self.relax.thread_data.priority.append(self.priority)
785 786
787 - def test_relax(self):
788 """Function for testing if the program path is valid and that relax can execute.""" 789 790 # Test command. 791 cmd = "%s --test" % self.prog_path 792 err = self.start_child(cmd=cmd, catch_err=1) 793 794 # Error. 795 if len(err): 796 # Print out. 797 self.text.append("Cannot execute relax on %s using the program path %s" % (self.login, `self.prog_path`)) 798 for line in err: 799 self.text.append(line[0:-1]) 800 801 # Return fail. 802 return 0 803 804 # No errors. 805 else: 806 return 1
807 808
809 - def test_ssh(self):
810 """Function for testing the SSH connection and public key authentication.""" 811 812 # Test command. 813 cmd = "ssh -o PasswordAuthentication=no -o ForwardX11=no %s \"echo 'relax> ssh ok'\"" % self.login 814 self.start_child(cmd=cmd, remote_exe=0, close=0) 815 816 # Test if the string 'relax, ssh ok' is in self.child.fromchild. 817 out = self.child.fromchild.readlines() 818 for line in out: 819 if search('relax> ssh ok', line): 820 return 1 821 822 # Read the errors. 823 err = self.child.childerr.readlines() 824 if len(err): 825 # SSH failure message. 826 self.text.append("Cannot establish a SSH connection to %s." % self.login) 827 828 # Public key authentication has failed. 829 key_auth = 1 830 for line in err: 831 if search('Permission denied', line): 832 key_auth = 0 833 if not key_auth: 834 self.text.append("Public key authenication failed.") 835 836 # All other errors. 837 for line in err: 838 self.text.append(line[0:-1])
839 840
841 - def test_wd(self):
842 """Function for testing if the working directory on the host machine exist.""" 843 844 # Test command. 845 cmd = "if test -d %s; then echo 'OK'; fi" % self.swd 846 out = self.start_child(cmd=cmd, catch_out=1) 847 848 # Directory exists. 849 for line in out: 850 if search('OK', line): 851 return 1 852 853 # No directory. 854 self.text.append("Cannot find the working directory %s on %s." % (self.swd, self.host_name)) 855 return 0
856