2011-10-03 137 views
4

嗨,我在執行命令時出現問題,該命令通過Ubuntu 10服務器執行100MB文件的wget。除此之外,較短的命令可以正常工作。下面的類包含了我如何使用paramiko和我嘗試克服這個問題的不同嘗試(請參閱不同的run或exec方法)。在exec_cmd的情況下執行掛在這一行:Paramiko在執行大型wget命令時掛起

 out = self.in_buffer.read(nbytes, self.timeout) 

從paramiko的channel.py模塊的recv方法。

同樣的wget命令在使用Mac的普通ssh實用程序的shell中完美工作。

""" 
Management of SSH connections 
""" 

import logging 
import os 
import paramiko 
import socket 
import time 
import StringIO 


class SSHClient(): 
    def __init__(self): 
     self._ssh_client = paramiko.SSHClient() 
     self._ssh_client.load_system_host_keys() 
     self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
     self.time_out = 300 
     self.wait = 5 

    def connect(self, hostname, user, pkey): 
     retry = self.time_out 
     self.hostname = hostname 
     logging.info("connecting to:%s user:%s key:%s" % (hostname, user, pkey)) 
     while retry > 0: 
      try: 
       self._ssh_client.connect(hostname, 
             username=user, 
             key_filename=os.path.expanduser(pkey), 
             timeout=self.time_out) 
       return 
      except socket.error, (value,message): 
       if value == 61 or value == 111: 
        logging.warning('SSH Connection refused, will retry in 5 seconds') 
        time.sleep(self.wait) 
        retry -= self.wait 
       else: 
        raise 
      except paramiko.BadHostKeyException: 
       logging.warning("%s has an entry in ~/.ssh/known_hosts and it doesn't match" % self.server.hostname) 
       logging.warning('Edit that file to remove the entry and then try again') 
       retry = 0 
      except EOFError: 
       logging.warning('Unexpected Error from SSH Connection, retry in 5 seconds') 
       time.sleep(self.wait) 
       retry -= self.wait 
     logging.error('Could not establish SSH connection') 

    def exists(self, path): 
     status = self.run('[ -a %s ] || echo "FALSE"' % path) 
     if status[1].startswith('FALSE'): 
      return 0 
     return 1 

    def shell(self): 
     """ 
     Start an interactive shell session on the remote host. 
     """ 
     channel = self._ssh_client.invoke_shell() 
     interactive_shell(channel) 

    def run(self, command): 
     """ 
     Execute a command on the remote host. Return a tuple containing 
     an integer status and a string containing all output from the command. 
     """ 
     logging.info('running:%s on %s' % (command, self.hostname)) 
     log_fp = StringIO.StringIO() 
     status = 0 
     try: 
      t = self._ssh_client.exec_command(command) 
     except paramiko.SSHException: 
      logging.error("Error executing command: " + command) 
      status = 1 
     log_fp.write(t[1].read()) 
     log_fp.write(t[2].read()) 
     t[0].close() 
     t[1].close() 
     t[2].close() 
     logging.info('output: %s' % log_fp.getvalue()) 
     return (status, log_fp.getvalue()) 

    def run_pty(self, command): 
     """ 
     Execute a command on the remote host with a pseudo-terminal. 
     Returns a string containing the output of the command. 
     """ 
     logging.info('running:%s on %s' % (command, self.hostname)) 
     channel = self._ssh_client.get_transport().open_session() 
     channel.get_pty() 
     status = 0 
     try: 
      channel.exec_command(command) 
     except: 
      logging.error("Error executing command: " + command) 
      status = 1 
     return status, channel.recv(1024) 

    def close(self): 
     transport = self._ssh_client.get_transport() 
     transport.close() 

    def run_remote(self, cmd, check_exit_status=True, verbose=True, use_sudo=False): 
     logging.info('running:%s on %s' % (cmd, self.hostname)) 
     ssh = self._ssh_client 
     chan = ssh.get_transport().open_session() 
     stdin = chan.makefile('wb') 
     stdout = chan.makefile('rb') 
     stderr = chan.makefile_stderr('rb') 
     processed_cmd = cmd 
     if use_sudo: 
      processed_cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"') 
     chan.exec_command(processed_cmd) 
     result = { 
      'stdout': [], 
      'stderr': [], 
     } 
     exit_status = chan.recv_exit_status() 
     result['exit_status'] = exit_status 

     def print_output(): 
      for line in stdout: 
       result['stdout'].append(line) 
       logging.info(line) 
      for line in stderr: 
       result['stderr'].append(line) 
       logging.info(line) 
     if verbose: 
      print processed_cmd 
      print_output() 
     return exit_status,result 

    def exec_cmd(self, cmd): 
     import select 
     ssh = self._ssh_client 
     channel = ssh.get_transport().open_session() 
     END = "CMD_EPILOGqwkjidksjk58754dskhjdksjKDSL" 
     cmd += ";echo " + END 
     logging.info('running:%s on %s' % (cmd, self.hostname)) 
     channel.exec_command(cmd) 
     out = "" 
     buf = "" 
     while END not in buf: 
      rl, wl, xl = select.select([channel],[],[],0.0) 
      if len(rl) > 0: 
       # Must be stdout 
       buf = channel.recv(1024) 
       logging.info(buf) 
       out += buf 
     return 0, out 

回答

2
  1. 在這種情況下,我會去的名單追加,然後串聯。爲什麼?那麼,Python中的字符串是不可變的。這意味着每次你使用+=時,你基本上都會創建兩個新字符串並讀取第三個字符串。另一方面,如果創建列表並追加它,則會將創建的字符串數量減半。
  2. 你真的需要多次撥打電話嗎?我的理解是,如果這個過程是線程阻塞,你並不在乎。由於select或多或少圍繞相同名稱的C-方法的包裝:

    的select()和PSELECT()允許一個程序來監視多個文件描述符,等待直到一個或多個所述文件描述符變成「準備「對於某些類別的I/O操作(例如可能的輸入)。如果可以在沒有阻塞的情況下執行相應的I/O操作(例如,讀取(2)),則文件描述符被準備好準備就緒。

  3. 你不聽的代碼中的socket.timeout異常。
  4. 寫入標準輸出/文件系統可能會很昂貴,但是您記錄的是由recv返回的每一行。你能移動日誌行嗎?
  5. 您是否考慮過手動處理閱讀頻道?技術上你唯一需要的代碼是:
try: 
    out = self.in_buffer.read(nbytes, self.timeout) 
except PipeTimeout, e: 
    # do something with error 

它不能保證,但會切出額外的處理。

2

我有同樣的問題,當我在遠程SSH客戶端上運行的shell腳本時,我的python腳本掛起,在400Mb文件上執行了wget命令。

我發現向wget命令添加超時修復了問題。 我原本:

wget的http://blah:8888/file.zip

這個現在

的wget -q -T90 http://blah:8888/file.zip

它就像一個魅力!

希望它有幫助。

+0

謝謝!正是我所需要的,-q對我來說已經足夠了.... –