我通過HTTP連接到本地服務器(OSRM)以提交路由並取回開車時間。我注意到I/O比線程慢,因爲它似乎等待計算的時間小於發送請求和處理JSON輸出所花費的時間(我認爲當服務器需要一些時間時I/O更好處理您的請求 - >您不希望它被阻止,因爲您必須等待,這不是我的情況)。 Threading受全局解釋器鎖定的影響,所以它出現(以及下面的證據),我最快的選擇是使用多處理。Python請求 - 線程/進程與IO
多處理的問題是它太快了,它耗盡了我的套接字,並且出現錯誤(每次請求發出一個新連接)。我可以(以串行方式)使用requests.Sessions()對象來保持連接處於活動狀態,但是我無法並行執行此操作(每個進程都有它自己的會話)。
我必須做什麼工作的最接近的代碼是這樣的多碼:
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
但是,我不能得到HTTPConnectionPool正常工作並創建新的套接字每次(我認爲),然後給我的錯誤:
HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
我的目標是獲得來自OSRM-routing server距離計算,我在本地運行(儘快)。我有一個問題在兩個部分 - 基本上我試圖轉換一些代碼使用multiprocessing.Pool()更好的代碼(正確的異步函數 - 以便執行永不中斷,並儘可能快地運行)。
我遇到的問題是,我所嘗試的一切似乎比多處理(我提出了幾個我試過的例子)。
一些潛在的方法是:gevents,grequests,龍捲風,要求期貨,ASYNCIO等
A - Multiprocessing.Pool()
我最初開始是這樣的:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
當我連接到本地服務器:這是8個線程和supports parallel execution推出(本地主機,端口5005)。
經過一番搜索,我意識到我得到的錯誤是因爲請求是opening a new connection/socket for each-request。所以這實際上太快了,而且過了一段時間之後耗盡了插座。看來解決這個問題的方法是使用requests.Session() - 但是我一直無法使用多處理(每個進程都有自己的會話)。
問題1:
在某些計算機此運行正常,例如:
來比較後:45%的服務器使用情況和每秒
但是1700條的要求,對一些它不,我不完全理解爲什麼:
HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
我的猜測是,因爲請求會在使用時鎖定套接字 - 有時服務器響應舊請求時速度太慢而無法生成新請求。服務器支持排隊,但是請求不這樣,而不是添加到隊列中,我得到錯誤?
問題2:
我發現:
Blocking Or Non-Blocking?
With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.
If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.
Two excellent examples are grequests and requests-futures.
乙 - 請求期貨
爲了解決這個問題,我需要重寫我的代碼使用異步請求,所以我嘗試以下使用:
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
(順便說一句,我開始我的服務器使用所有線程的選項)
和主代碼:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
calc_routes.append(row)
當我的函數(ReqOsrm)現在被改寫爲:
def ReqOsrm(sess, resp):
json_geocode = resp.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
# Cannot find route between points (code errors as 999)
else:
out = [999, 0, 0, 0, 0, 0, 0]
resp.data = out
但是,這個代碼是比多處理一個更慢!在我接近1700個請求之前,現在我獲得了600秒。我想這是因爲我沒有完整的CPU利用率,但我不知道如何增加它?
Ç - 螺紋
我嘗試另一種方法(creating threads) - 但是又是不知道如何得到這個最大限度地使用CPU(理想情況下我想我的服務器使用50看%,不是?):
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = resp.json()
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done but no route")
out = [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("Error: %s" % err)
out = [qid, 999, 0, 0, 0, 0, 0, 0]
qres.put(out)
return
#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in url_routes:
q.put(url)
q.join()
except Exception:
pass
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
這種方法比requests_futures我想快,但我不知道有多少線程設置爲最大限度地提高這一點 -
d - 龍捲風(不工作)
我現在正在嘗試龍捲風 - 但不能完全得到它的工作它打破了存在的代碼-1073741819如果我使用捲曲 - 如果我使用simple_httpclient它的作品,但然後我得到超時錯誤:
ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout
def handle_req(r):
try:
json_geocode = json_decode(r)
status = int(json_geocode['status'])
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
print(out)
except Exception as err:
print(err)
out = [999, 0, 0, 0, 0, 0, 0]
return out
# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)
@gen.coroutine
def run_experiment(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url, qid in urls]
responses_out = [handle_req(r.body) for r in responses]
raise gen.Return(value=responses_out)
# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)
ë - ASYNCIO/aiohttp
決定嘗試另一種方法(儘管將是巨大的龍捲風獲得工作)使用ASYNCIO和aiohttp。
import asyncio
import aiohttp
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done, but not route for {0} - status: {1}".format(qid, status))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
return out
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
response = yield from aiohttp.request('GET', url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(500)
# http_client returns futures
# save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
這工作正常,但仍然比多處理慢!
另一種方法以外試圖用最佳的線程池大小鬼混是使用事件循環。你可以使用回調來註冊請求,並等待事件循環處理,只要響應返回 – dm03514
@ dm03514謝謝你!但是,當我做我的請求 - 期貨例子時,這不是我所擁有的嗎? 'future = session.get(url_in,background_callback = lambda sess,resp:ReqOsrm(sess,resp))' – mptevsion
我從來沒有使用過RequestFuture,但是我認爲它仍然會延遲到一個線程池,事件循環應該是一個新的請求模型全部在一起,並且只會公開一個線程,因此您不必擔心要配置多少個線程才能工作:) python在stdlibrary中有一個線程https://pypi.python.org/pypi/aiohttp ,我從來沒有用過,但看起來相對簡單,龍捲風是一個框架構建在OS事件庫,它具有簡單的API。 http://tornadokevinlee.readthedocs.org/en/latest/httpclient.html – dm03514