感謝Martijn Pieters answer我停止了圍繞python請求行爲的工作,並尋找一種完全不同的方法。
我結束了使用pyCurl。您可以像使用Tornado等一樣使用它,類似於select + recv循環,無需反轉控制流並放棄對專用IO循環的控制。這種方式很容易使用發電機,一旦它們到達時就會產生新的線路 - 中間層無需進一步緩衝,這可能會引入運行IO環路的延遲或附加線程。
同時,它足夠高級,您不需要擔心分塊傳輸編碼,SSL加密或gzip壓縮。
這是我的舊代碼,其中chunk_size
= 1導致45%的CPU負載,並且引入了額外的延遲。
import requests
class RequestsHTTPStream(object):
def __init__(self, url):
self.url = url
def iter_lines(self):
headers = {'Cache-Control':'no-cache',
'Accept': 'text/event-stream',
'Accept-Encoding': 'gzip'}
response = requests.get(self.url, stream=True, headers=headers)
return response.iter_lines(chunk_size=1)
這裏是基於pyCurl我的新代碼: (不幸的是,curl_easy_ *風格perform
塊完全,這使得它很難產生在線路之間不使用線程因此我使用curl_multi_ *方法。 )
import pycurl
import urllib2
import httplib
import StringIO
class CurlHTTPStream(object):
def __init__(self, url):
self.url = url
self.received_buffer = StringIO.StringIO()
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
self.curl.setopt(pycurl.ENCODING, 'gzip')
self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)
self.curlmulti = pycurl.CurlMulti()
self.curlmulti.add_handle(self.curl)
self.status_code = 0
SELECT_TIMEOUT = 10
def _any_data_received(self):
return self.received_buffer.tell() != 0
def _get_received_data(self):
result = self.received_buffer.getvalue()
self.received_buffer.truncate(0)
self.received_buffer.seek(0)
return result
def _check_status_code(self):
if self.status_code == 0:
self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
if self.status_code != 0 and self.status_code != httplib.OK:
raise urllib2.HTTPError(self.url, self.status_code, None, None, None)
def _perform_on_curl(self):
while True:
ret, num_handles = self.curlmulti.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
return num_handles
def _iter_chunks(self):
while True:
remaining = self._perform_on_curl()
if self._any_data_received():
self._check_status_code()
yield self._get_received_data()
if remaining == 0:
break
self.curlmulti.select(self.SELECT_TIMEOUT)
self._check_status_code()
self._check_curl_errors()
def _check_curl_errors(self):
for f in self.curlmulti.info_read()[2]:
raise pycurl.error(*f[1:])
def iter_lines(self):
chunks = self._iter_chunks()
return self._split_lines_from_chunks(chunks)
@staticmethod
def _split_lines_from_chunks(chunks):
#same behaviour as requests' Response.iter_lines(...)
pending = None
for chunk in chunks:
if pending is not None:
chunk = pending + chunk
lines = chunk.splitlines()
if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
pending = lines.pop()
else:
pending = None
for line in lines:
yield line
if pending is not None:
yield pending
此代碼試圖如果只有幾個到從傳入的流取的字節數可能的,而不會不必要地阻塞。相比之下,CPU負載大約爲0.2%
*你知道一個好的框架/ libraray的任務?*圖書館的請求是離題,我害怕。 –
對圖書館的要求抱歉。正如所寫,我目前正在使用python-requests,並希望繼續使用它。所以我的問題主要是如何用python-requests做些事情。但是:如果沒有辦法,我完全可以使用另一個庫。 –
尋找線條是可能的。當我在一個GZipped Response Stream中刷新Tornado中的響應時,底層壓縮zlib支持使用的刷新(Z_SYNC_FLUSH)。所以,http流非常好,分成完美的部分,其中完整的壓縮線。僅僅用Python閱讀它們是很困難的。 –