2014-02-15 19 views
7

我寫了一個HTTP服務器,它產生由JSON結構事件組成的無盡HTTP流。類似於Twitter的流媒體API。這些事件由\n(根據Server-sent events與Content-Type:文本/事件流)分隔,並且可能會有所不同。當他們到達時,從壓縮的,分塊的HTTP流中有效地讀取行

的響應是

  • 分塊(HTTP 1.1傳輸編碼:分塊)由於層出不窮
  • 壓縮(內容編碼:gzip),以節省帶寬。

我希望在Python到達時儘快使用這些行,並儘可能節約資源,而不用重新發明輪子。

由於我目前正在使用python-requests,你知道如何使它工作嗎? 如果你認爲,python-requests在這裏沒有幫助,我完全開放其他框架/庫。

我目前的實施是基於requests並使用iter_lines(...)接收行。但chunk_size參數很棘手。如果設置爲1它非常強烈,因爲某些事件可能是幾千字節。如果設置爲大於1的任何值,則有些事件會卡住,直到下一次到達,並且整個緩衝區「已滿」。事件之間的時間可能會持續幾秒鐘。 我預計chunk_size是某種「接收的最大字節數」,如在unix的recv(...)中。相應的手冊頁說:

的接聽電話正常返回的任何數據可用,直到 請求的數量,而不是等待接收請求全額 的。

但這顯然不是它在請求庫中的工作方式。他們或多或少地使用它作爲「準確的接收字節數」。 在查看他們的源代碼時,我無法確定哪個部分負責。也許httplib的Response或ssl的SSLSocket。

作爲一種解決方法,我嘗試將服務器上的行填充到塊大小的倍數。 但是請求庫中的塊大小用於從壓縮的響應流中獲取字節。 所以這不會工作,直到我可以填充我的行,以便他們的壓縮字節序列是塊大小的倍數。但是這似乎太過分了。

我讀過Twisted可以用於客戶端上的HTTP流的非阻塞,非緩衝處理,但我只找到在服務器上創建流響應的代碼。

+0

*你知道一個好的框架/ libraray的任務?*圖書館的請求是離題,我害怕。 –

+0

對圖書館的要求抱歉。正如所寫,我目前正在使用python-requests,並希望繼續使用它。所以我的問題主要是如何用python-requests做些事情。但是:如果沒有辦法,我完全可以使用另一個庫。 –

+0

尋找線條是可能的。當我在一個GZipped Response Stream中刷新Tornado中的響應時,底層壓縮zlib支持使用的刷新(Z_SYNC_FLUSH)。所以,http流非常好,分成完美的部分,其中完整的壓縮線。僅僅用Python閱讀它們是很困難的。 –

回答

8

感謝Martijn Pieters answer我停止了圍繞python請求行爲的工作,並尋找一種完全不同的方法。

我結束了使用pyCurl。您可以像使用Tornado等一樣使用它,類似於select + recv循環,無需反轉控制流並放棄對專用IO循環的控制。這種方式很容易使用發電機,一旦它們到達時就會產生新的線路 - 中間層無需進一步緩衝,這可能會引入運行IO環路的延遲或附加線程。

同時,它足夠高級,您不需要擔心分塊傳輸編碼,SSL加密或gzip壓縮。

這是我的舊代碼,其中chunk_size = 1導致45%的CPU負載,並且引入了額外的延遲。

import requests 
class RequestsHTTPStream(object): 
    def __init__(self, url): 
     self.url = url 

    def iter_lines(self): 
     headers = {'Cache-Control':'no-cache', 
        'Accept': 'text/event-stream', 
        'Accept-Encoding': 'gzip'} 
     response = requests.get(self.url, stream=True, headers=headers) 
     return response.iter_lines(chunk_size=1) 

這裏是基於pyCurl我的新代碼: (不幸的是,curl_easy_ *風格perform塊完全,這使得它很難產生在線路之間不使用線程因此我使用curl_multi_ *方法。 )

import pycurl 
import urllib2 
import httplib 
import StringIO 

class CurlHTTPStream(object): 
    def __init__(self, url): 
     self.url = url 
     self.received_buffer = StringIO.StringIO() 

     self.curl = pycurl.Curl() 
     self.curl.setopt(pycurl.URL, url) 
     self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream']) 
     self.curl.setopt(pycurl.ENCODING, 'gzip') 
     self.curl.setopt(pycurl.CONNECTTIMEOUT, 5) 
     self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write) 

     self.curlmulti = pycurl.CurlMulti() 
     self.curlmulti.add_handle(self.curl) 

     self.status_code = 0 

    SELECT_TIMEOUT = 10 

    def _any_data_received(self): 
     return self.received_buffer.tell() != 0 

    def _get_received_data(self): 
     result = self.received_buffer.getvalue() 
     self.received_buffer.truncate(0) 
     self.received_buffer.seek(0) 
     return result 

    def _check_status_code(self): 
     if self.status_code == 0: 
      self.status_code = self.curl.getinfo(pycurl.HTTP_CODE) 
     if self.status_code != 0 and self.status_code != httplib.OK: 
      raise urllib2.HTTPError(self.url, self.status_code, None, None, None) 

    def _perform_on_curl(self): 
     while True: 
      ret, num_handles = self.curlmulti.perform() 
      if ret != pycurl.E_CALL_MULTI_PERFORM: 
       break 
     return num_handles 

    def _iter_chunks(self): 
     while True: 
      remaining = self._perform_on_curl() 
      if self._any_data_received(): 
       self._check_status_code() 
       yield self._get_received_data() 
      if remaining == 0: 
       break 
      self.curlmulti.select(self.SELECT_TIMEOUT) 

     self._check_status_code() 
     self._check_curl_errors() 

    def _check_curl_errors(self): 
     for f in self.curlmulti.info_read()[2]: 
      raise pycurl.error(*f[1:]) 

    def iter_lines(self): 
     chunks = self._iter_chunks() 
     return self._split_lines_from_chunks(chunks) 

    @staticmethod 
    def _split_lines_from_chunks(chunks): 
     #same behaviour as requests' Response.iter_lines(...) 

     pending = None 
     for chunk in chunks: 

      if pending is not None: 
       chunk = pending + chunk 
      lines = chunk.splitlines() 

      if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: 
       pending = lines.pop() 
      else: 
       pending = None 

      for line in lines: 
       yield line 

     if pending is not None: 
      yield pending 

此代碼試圖如果只有幾個到從傳入的流取的字節數可能的,而不會不必要地阻塞。相比之下,CPU負載大約爲0.2%

+0

這個'self.received_buffer.truncate(0)'不會導致數據丟失嗎?我的意思是通過回調方法('self.received_buffer.write')寫入'self.received_buffer'的數據,但尚未讀取'self.received_buffer.getvalue()'? – Amit

+0

寫入不會異步發生,WRITEFUNCTION只在perform()中調用。當你應該調用執行讀/寫數據時,select()表示不進行不必要的輪詢。 –

+0

不錯的工作!無論如何,你可以添加一個如何使用它的例子嗎?展示如何將它與progressbar.ProgressBar一起使用,我們將不勝感激。 –

6

這不是requests'iter_lines()呼叫阻塞的錯誤。

Response.iter_lines()方法調用Response.iter_content(),它調用urllib3HTTPResponse.stream(),它調用HTTPResponse.read()

這些調用傳遞一個塊大小,這是傳遞給套接字的那個,如self._fp.read(amt)。這是有問題的調用,因爲self._fp是由socket.makefile()生成的文件對象(由httplib module完成);並且這個.read()調用塊直到amt(壓縮)字節被讀取。

該底層套接字文件對象確實支持.readline()調用,該調用將更高效地工作,但urllib3在處理壓縮數據時不能使用此調用;行結束符不會在壓縮流中可見。

遺憾的是,urllib3在響應未被壓縮時也不會調用self._fp.readline();調用的結構方式很難傳遞,你希望以行緩衝模式讀取,而不是像塊緩衝模式那樣讀取。

我必須說HTTP不是用於流式事件的最佳協議;我會爲此使用不同的協議。 Websockets想到了,或者針對您的特定用例的自定義協議。

+0

感謝您指出,「問題」在請求使用的http堆棧中是深入的! –

+0

@ThomasB:是的,也沒有我知道的圖書館會以一種乾淨的方式解決這個問題。壓縮在這裏沒有幫助;你將不得不使用'urllib2',然後從響應對象訪問原始套接字,執行非阻塞式讀取並進行自己的解壓縮。不漂亮。 –

+0

我在閱讀[SSE vs WebSockets]後選擇了SSE(http://stackoverflow.com/questions/5195452/websockets-vs-server-sent-events-eventsource)。我只需要下游,JS客戶端超級簡單,服務器超級簡單,反向代理,加密,壓縮在HTTP上正常工作。 –