2017-04-17 93 views
0

我想使用subprocess.Popen()來執行一個C程序,並實時輸出它的輸出並將它發送給客戶端。但是,輸出緩衝並在執行結束時一起發送(Blocking性質)。我怎樣才能實時接收輸出,然後立即將其發送到Twisted Autobahn中。如何在Twisted [autobahn] websocket服務器中實時流式輸出?

def onConnect(self, request): 
    try: 
     self.cont_name = ''.join(random.choice(string.lowercase) for i in range(5)) 
     self.file_name = self.cont_name 
     print("Connecting...") 
    except Exception: 
     print("Failed"+str(Exception))  

def onOpen(self): 
    try: 
     print("open") 
    except Exception: 
     print("Couldn't create container") 

def onMessage(self, payload,isBinary=False): 
     cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name 
     a = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, bufsize=1) 
     for line in iter(a.stdout.readline, b''): 
      line = line.encode('utf8') 
      self.sendMessage(line) 

def onClose(self, wasClean, code, reason): 
    try: 
     print("Closed container...") 
    except Exception: 
     print(str(Exception))  

當使用子過程執行該命令搬運工,C代碼的整個輸出在一次,而不是因爲它發生返回。對於前:

#include <stdio.h> 
#include <unistd.h> 
int main(){ 
int i=0; 
for(i=0;i<5;i++){ 
    fflush(stdout); 
    printf("Rounded\n"); 
    sleep(3); 
} 
} 

在容器中運行之後,程序應該在3秒到客戶端返回「四捨五入」。然而,它最終會在執行結束時發送所有'Rounded'。

+0

你的問題廣義上講是有道理的,但是*廣泛地說,它不是很清楚你遇到的問題在哪裏,或者有什麼建議可以幫助你克服它。你可以包含一些示例代碼嗎?最好按照http://sscce.org/的說法。 –

+0

@ Jean-PaulCalderone我添加了一些代碼。請確認一下。謝謝 –

回答

0

的不當行爲來自環路在該方法中:

def onMessage(self, payload,isBinary=False): 
     cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name 
     a = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, bufsize=1) 
     for line in iter(a.stdout.readline, b''): 
      line = line.encode('utf8') 
      self.sendMessage(line) 

Twisted是一個協作多任務處理系統。默認情況下,所有內容都在單個線程中運行(「反應器線程」)。這意味着所有代碼必須定期(並且通常很快)放棄控制,以便其他代碼(應用程序代碼或Twisted實現代碼)有機會運行。該函數中的循環從子進程讀取並嘗試使用高速公路API發送數據 - 一遍又一遍,從不放棄控制。

阻塞來自Popen對象的讀取也可能導致問題。您不知道讀取會阻塞多長時間,因此您不知道多長時間會阻止其他代碼在反應器線程中運行。你既可以將您的POPEN讀取到一個新的線程,他們不會阻止反應堆螺紋:

def onMessage(self, payload,isBinary=False): 
    cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name 
    popen_in_thread(
     lambda line: reactor.callFromThread(
      lambda: self.sendMessage(line.encode("utf-8")) 
     ), 
     [cmd], shell=True, stdout=subprocess.PIPE, bufsize=1 
    ) 

def popen_in_thread(callback, *args, **kwargs): 
    def threaded(): 
     a = subprocess.Popen(*args, **kwargs) 
     for line in iter(a.stdout.readline, b''): 
      callback(line) 
    reactor.callInThread(threaded) 

或者,更好,使用雙絞線自身的流程支持:

def onMessage(self, payload,isBinary=False): 
    class ProcessLinesToMessages(ProcessProtocol): 
     def outReceived(self, output): 
      buf = self.buf + output 
      lines = buf.splitlines() 
      self.buf = lines.pop() 
      for line in lines: 
       self.sendMessage(line.encode("utf-8")) 
      while True: 
       line, self.buf = self.buf.splitline 
    reactor.spawnProcess(
     ProcessLinesToMessages(), 
     "docker", 
     [ 
      "docker", 
      "exec", 
      self.cont_name, 
      "/tmp/./ + self.file_name, 
     ], 
    ) 

(既不版本進行測試,希望這個想法很清楚)

+0

可以在這種情況下使用? –

+0

Deferred表示單個未來結果。您想要傳輸正在進行的流程中的數據。這兩件事情沒有很好地吻合。不過,您可能想使用管道,而不是像本示例代碼中所做的那樣手工製作回調。 –

相關問題