12

我通過HTTP連接到本地服務器(OSRM)以提交路由並取回開車時間。我注意到I/O比線程慢,因爲它似乎等待計算的時間小於發送請求和處理JSON輸出所花費的時間(我認爲當服務器需要一些時間時I/O更好處理您的請求 - >您不希望它被阻止,因爲您必須等待,這不是我的情況)。 Threading受全局解釋器鎖定的影響,所以它出現(以及下面的證據),我最快的選擇是使用多處理。Python請求 - 線程/進程與IO

多處理的問題是它太快了,它耗盡了我的套接字,並且出現錯誤(每次請求發出一個新連接)。我可以(以串行方式)使用requests.Sessions()對象來保持連接處於活動狀態,但是我無法並行執行此操作(每個進程都有它自己的會話)。

我必須做什麼工作的最接近的代碼是這樣的多碼:

conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count()) 

def ReqOsrm(url_input): 
    ul, qid = url_input  
    try: 
     response = conn_pool.request('GET', ul) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     status = int(json_geocode['status']) 
     if status == 200: 
      tot_time_s = json_geocode['route_summary']['total_time'] 
      tot_dist_m = json_geocode['route_summary']['total_distance'] 
      used_from, used_to = json_geocode['via_points'] 
      out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
      return out 
     else: 
      print("Done but no route: %d %s" % (qid, req_url)) 
      return [qid, 999, 0, 0, 0, 0, 0, 0] 
    except Exception as err: 
     print("%s: %d %s" % (err, qid, req_url)) 
     return [qid, 999, 0, 0, 0, 0, 0, 0] 

# run: 
pool = Pool(cpu_count()) 
calc_routes = pool.map(ReqOsrm, url_routes) 
pool.close() 
pool.join() 

但是,我不能得到HTTPConnectionPool正常工作並創建新的套接字每次(我認爲),然後給我的錯誤:

HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))


我的目標是獲得來自OSRM-routing server距離計算,我在本地運行(儘快)。我有一個問題在兩個部分 - 基本上我試圖轉換一些代碼使用multiprocessing.Pool()更好的代碼(正確的異步函數 - 以便執行永不中斷,並儘可能快地運行)。

我遇到的問題是,我所嘗試的一切似乎比多處理(我提出了幾個我試過的例子)。

一些潛在的方法是:gevents,grequests,龍捲風,要求期貨,ASYNCIO等

A - Multiprocessing.Pool()

我最初開始是這樣的:

def ReqOsrm(url_input): 
    req_url, query_id = url_input 
    try_c = 0 
    #print(req_url) 
    while try_c < 5: 
     try: 
      response = requests.get(req_url) 
      json_geocode = response.json() 
      status = int(json_geocode['status']) 
      # Found route between points 
      if status == 200: 
      .... 

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes) 

當我連接到本地服務器:這是8個線程和supports parallel execution推出(本地主機,端口5005)。

經過一番搜索,我意識到我得到的錯誤是因爲請求是opening a new connection/socket for each-request。所以這實際上太快了,而且過了一段時間之後耗盡了插座。看來解決這個問題的方法是使用requests.Session() - 但是我一直無法使用多處理(每個進程都有自己的會話)。

問題1:

在某些計算機此運行正常,例如:

enter image description here

來比較後:45%的服務器使用情況和每秒

但是1700條的要求,對一些它不,我不完全理解爲什麼:

HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))

我的猜測是,因爲請求會在使用時鎖定套接字 - 有時服務器響應舊請求時速度太慢而無法生成新請求。服務器支持排隊,但是請求不這樣,而不是添加到隊列中,我得到錯誤?

問題2:

我發現:

Blocking Or Non-Blocking?

With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.

If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.

Two excellent examples are grequests and requests-futures.

乙 - 請求期貨

爲了解決這個問題,我需要重寫我的代碼使用異步請求,所以我嘗試以下使用:

from requests_futures.sessions import FuturesSession 
from concurrent.futures import ThreadPoolExecutor, as_completed 

(順便說一句,我開始我的服務器使用所有線程的選項)

和主代碼:

calc_routes = [] 
futures = {} 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session: 
    # Submit requests and process in background 
    for i in range(len(url_routes)): 
     url_in, qid = url_routes[i] # url |query-id 
     future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) 
     futures[future] = qid 
    # Process the futures as they become complete 
    for future in as_completed(futures): 
     r = future.result() 
     try: 
      row = [futures[future]] + r.data 
     except Exception as err: 
      print('No route') 
      row = [futures[future], 999, 0, 0, 0, 0, 0, 0] 
     calc_routes.append(row) 

當我的函數(ReqOsrm)現在被改寫爲:

def ReqOsrm(sess, resp): 
    json_geocode = resp.json() 
    status = int(json_geocode['status']) 
    # Found route between points 
    if status == 200: 
     tot_time_s = json_geocode['route_summary']['total_time'] 
     tot_dist_m = json_geocode['route_summary']['total_distance'] 
     used_from = json_geocode['via_points'][0] 
     used_to = json_geocode['via_points'][1] 
     out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
    # Cannot find route between points (code errors as 999) 
    else: 
     out = [999, 0, 0, 0, 0, 0, 0] 
    resp.data = out 

但是,這個代碼是比多處理一個更慢!在我接近1700個請求之前,現在我獲得了600秒。我想這是因爲我沒有完整的CPU利用率,但我不知道如何增加它?

enter image description here

Ç - 螺紋

我嘗試另一種方法(creating threads) - 但是又是不知道如何得到這個最大限度地使用CPU(理想情況下我想我的服務器使用50看%,不是?):

def doWork(): 
    while True: 
     url,qid = q.get() 
     status, resp = getReq(url) 
     processReq(status, resp, qid) 
     q.task_done() 

def getReq(url): 
    try: 
     resp = requests.get(url) 
     return resp.status_code, resp 
    except: 
     return 999, None 

def processReq(status, resp, qid): 
    try: 
     json_geocode = resp.json() 
     # Found route between points 
     if status == 200: 
      tot_time_s = json_geocode['route_summary']['total_time'] 
      tot_dist_m = json_geocode['route_summary']['total_distance'] 
      used_from = json_geocode['via_points'][0] 
      used_to = json_geocode['via_points'][1] 
      out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
     else: 
      print("Done but no route") 
      out = [qid, 999, 0, 0, 0, 0, 0, 0] 
    except Exception as err: 
     print("Error: %s" % err) 
     out = [qid, 999, 0, 0, 0, 0, 0, 0] 
    qres.put(out) 
    return 

#Run: 
concurrent = 1000 
qres = Queue() 
q = Queue(concurrent) 

for i in range(concurrent): 
    t = Thread(target=doWork) 
    t.daemon = True 
    t.start() 
try: 
    for url in url_routes: 
     q.put(url) 
     q.join() 
    except Exception: 
     pass 

# Get results 
calc_routes = [qres.get() for _ in range(len(url_routes))] 

這種方法比requests_futures我想快,但我不知道有多少線程設置爲最大限度地提高這一點 -

enter image description here

d - 龍捲風(不工作)

我現在正在嘗試龍捲風 - 但不能完全得到它的工作它打破了存在的代碼-1073741819如果我使用捲曲 - 如果我使用simple_httpclient它的作品,但然後我得到超時錯誤:

ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout

def handle_req(r): 
    try: 
     json_geocode = json_decode(r) 
     status = int(json_geocode['status']) 
     tot_time_s = json_geocode['route_summary']['total_time'] 
     tot_dist_m = json_geocode['route_summary']['total_distance'] 
     used_from = json_geocode['via_points'][0] 
     used_to = json_geocode['via_points'][1] 
     out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
     print(out) 
    except Exception as err: 
     print(err) 
     out = [999, 0, 0, 0, 0, 0, 0] 
    return out 

# Configure 
# For some reason curl_httpclient crashes my computer 
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10) 

@gen.coroutine 
def run_experiment(urls): 
    http_client = AsyncHTTPClient() 
    responses = yield [http_client.fetch(url) for url, qid in urls] 
    responses_out = [handle_req(r.body) for r in responses] 
    raise gen.Return(value=responses_out) 

# Initialise 
_ioloop = ioloop.IOLoop.instance() 
run_func = partial(run_experiment, url_routes) 
calc_routes = _ioloop.run_sync(run_func) 

ë - ASYNCIO/aiohttp

決定嘗試另一種方法(儘管將是巨大的龍捲風獲得工作)使用ASYNCIO和aiohttp。

import asyncio 
import aiohttp 

def handle_req(data, qid): 
    json_geocode = json.loads(data.decode('utf-8')) 
    status = int(json_geocode['status']) 
    if status == 200: 
     tot_time_s = json_geocode['route_summary']['total_time'] 
     tot_dist_m = json_geocode['route_summary']['total_distance'] 
     used_from = json_geocode['via_points'][0] 
     used_to = json_geocode['via_points'][1] 
     out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
    else: 
     print("Done, but not route for {0} - status: {1}".format(qid, status)) 
     out = [qid, 999, 0, 0, 0, 0, 0, 0] 
    return out 

def chunked_http_client(num_chunks): 
    # Use semaphore to limit number of requests 
    semaphore = asyncio.Semaphore(num_chunks) 
    @asyncio.coroutine 
    # Return co-routine that will download files asynchronously and respect 
    # locking fo semaphore 
    def http_get(url, qid): 
     nonlocal semaphore 
     with (yield from semaphore): 
      response = yield from aiohttp.request('GET', url) 
      body = yield from response.content.read() 
      yield from response.wait_for_close() 
     return body, qid 
    return http_get 

def run_experiment(urls): 
    http_client = chunked_http_client(500) 
    # http_client returns futures 
    # save all the futures to a list 
    tasks = [http_client(url, qid) for url, qid in urls] 
    response = [] 
    # wait for futures to be ready then iterate over them 
    for future in asyncio.as_completed(tasks): 
     data, qid = yield from future 
     try: 
      out = handle_req(data, qid) 
     except Exception as err: 
      print("Error for {0} - {1}".format(qid,err)) 
      out = [qid, 999, 0, 0, 0, 0, 0, 0] 
     response.append(out) 
    return response 

# Run: 
loop = asyncio.get_event_loop() 
calc_routes = loop.run_until_complete(run_experiment(url_routes)) 

這工作正常,但仍然比多處理慢!

enter image description here

+1

另一種方法以外試圖用最佳的線程池大小鬼混是使用事件循環。你可以使用回調來註冊請求,並等待事件循環處理,只要響應返回 – dm03514

+0

@ dm03514謝謝你!但是,當我做我的請求 - 期貨例子時,這不是我所擁有的嗎? 'future = session.get(url_in,background_callback = lambda sess,resp:ReqOsrm(sess,resp))' – mptevsion

+1

我從來沒有使用過RequestFuture,但是我認爲它仍然會延遲到一個線程池,事件循環應該是一個新的請求模型全部在一起,並且只會公開一個線程,因此您不必擔心要配置多少個線程才能工作:) python在stdlibrary中有一個線程https://pypi.python.org/pypi/aiohttp ,我從來沒有用過,但看起來相對簡單,龍捲風是一個框架構建在OS事件庫,它具有簡單的API。 http://tornadokevinlee.readthedocs.org/en/latest/httpclient.html – dm03514

回答

1

查看問題頂部的多處理代碼。每次調用ReqOsrm時似乎都會調用HttpConnectionPool()。因此爲每個網址創建一個新的池。相反,請使用參數initializerargs爲每個進程創建一個池。

conn_pool = None 

def makePool(host, port): 
    global conn_pool 
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1) 

def ReqOsrm(url_input): 
    ul, qid = url_input 

    try: 
     response = conn_pool.request('GET', ul) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     status = int(json_geocode['status']) 
     if status == 200: 
      tot_time_s = json_geocode['route_summary']['total_time'] 
      tot_dist_m = json_geocode['route_summary']['total_distance'] 
      used_from, used_to = json_geocode['via_points'] 
      out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]] 
      return out 

     else: 
      print("Done but no route: %d %s" % (qid, req_url)) 
      return [qid, 999, 0, 0, 0, 0, 0, 0] 

    except Exception as err: 
     print("%s: %d %s" % (err, qid, req_url)) 
     return [qid, 999, 0, 0, 0, 0, 0, 0] 

if __name__ == "__main__": 
    # run: 
    pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005)) 
    calc_routes = pool.map(ReqOsrm, url_routes) 
    pool.close() 
    pool.join() 

請求期貨版本似乎有縮進錯誤。循環 for future in as_completed(futures):在外部循環下縮進 for i in range(len(url_routes)):。因此,在外循環中發出請求,然後內循環等待該未來在外循環的下一次迭代之前返回。這使請求串行而不是並行地運行。

我認爲代碼應該如下:

calc_routes = [] 
futures = {} 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session: 
    # Submit all the requests and process in background 
    for i in range(len(url_routes)): 
     url_in, qid = url_routes[i] # url |query-id 
     future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) 
     futures[future] = qid 

    # this was indented under the code in section B of the question 
    # process the futures as they become copmlete 
    for future in as_completed(futures): 
     r = future.result() 
     try: 
      row = [futures[future]] + r.data 

     except Exception as err: 
      print('No route') 
      row = [futures[future], 999, 0, 0, 0, 0, 0, 0] 
     print(row) 
     calc_routes.append(row) 
2

問題1

你得到的錯誤,因爲這種做法:

def ReqOsrm(url_input): 
    req_url, query_id = url_input 
    try_c = 0 
    #print(req_url) 
    while try_c < 5: 
     try: 
      response = requests.get(req_url) 
      json_geocode = response.json() 
      status = int(json_geocode['status']) 
      # Found route between points 
      if status == 200: 
      .... 

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes) 

對每個請求的URL新的TCP連接,並在某些時候它失敗只是因爲系統沒有免費的本地端口。爲了確認,你可以同時執行代碼運行netstat

netstat -a -n | find /c "localhost:5005" 

這會給你一個號碼連接到服務器上的。

另外,達到1700 RPS對於這種方法來說看起來相當不現實,因爲requests.get的操作非常昂貴,並且這樣你甚至不可能得到50 RPS。所以,你可能需要仔細檢查你的RPS計算。

爲了避免您需要改用創建從臨時連接的會話錯誤:

import multiprocessing 
import requests 
import time 


class Worker(multiprocessing.Process): 
    def __init__(self, qin, qout, *args, **kwargs): 
     super(Worker, self).__init__(*args, **kwargs) 
     self.qin = qin 
     self.qout = qout 

    def run(self): 
     s = requests.session() 
     while not self.qin.empty(): 
      result = s.get(self.qin.get()) 
      self.qout.put(result) 
      self.qin.task_done() 

if __name__ == '__main__': 
    start = time.time() 

    qin = multiprocessing.JoinableQueue() 
    [qin.put('http://localhost:8080/') for _ in range(10000)] 

    qout = multiprocessing.Queue() 

    [Worker(qin, qout).start() for _ in range(multiprocessing.cpu_count())] 

    qin.join() 

    result = [] 
    while not qout.empty(): 
     result.append(qout.get()) 

    print time.time() - start 
    print result 

問題2

,你不會得到與線程或異步更高RPS辦法,除非我/ O比計算需要更多的時間(例如高網絡延遲,大響應等),因爲線程受到GIL的影響,因爲在同一個Python進程中運行並且異步庫可能被長時間運行的計算阻塞。

儘管線程或異步庫可以提高性能,但在多個進程中運行相同的線程或異步代碼將爲您提供更高的性能。

5

感謝大家的幫助。我想發佈我的結論:

由於我的HTTP請求是對即時處理請求的本地服務器,所以使用異步方法沒有什麼意義(與大多數情況下請求通過互聯網發送時相比)。對我來說,昂貴的因素實際上是發送請求並處理反饋,這意味着使用多個進程(線程遭受GIL攻擊)可以獲得更好的速度。我還應該使用會話來提高速度(無需關閉並重新打開與SAME服務器的連接),並有助於防止端口耗盡。

以下是所有的方法嘗試(工作)與實施例RPS:

串行

S1。串行GET請求(無會話) - > 215 RPS

def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = requests.get(url) 
     json_geocode = json.loads(response.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     return [qid, 999, 0, 0] 
# Run:  
calc_routes = [ReqOsrm(x) for x in url_routes] 

S2。串行GET請求(requests.Session()) - > 335 RPS

session = requests.Session() 
def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = session.get(url) 
     json_geocode = json.loads(response.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     return [qid, 999, 0, 0] 
# Run:  
calc_routes = [ReqOsrm(x) for x in url_routes] 

S3。串行GET請求(urllib3.HTTPConnectionPool) - > 545 RPS

conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1) 
def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = conn_pool.request('GET', url) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     return [qid, 999, 0, 0] 
# Run:  
calc_routes = [ReqOsrm(x) for x in url_routes] 

異步IO

A4。帶有aiohttp的AsyncIO - > 450 RPS

import asyncio 
import aiohttp 
concurrent = 100 
def handle_req(data, qid): 
    json_geocode = json.loads(data.decode('utf-8')) 
    tot_time_s = json_geocode['paths'][0]['time'] 
    tot_dist_m = json_geocode['paths'][0]['distance'] 
    return [qid, 200, tot_time_s, tot_dist_m] 
def chunked_http_client(num_chunks): 
    # Use semaphore to limit number of requests 
    semaphore = asyncio.Semaphore(num_chunks) 
    @asyncio.coroutine 
    # Return co-routine that will download files asynchronously and respect 
    # locking fo semaphore 
    def http_get(url, qid): 
     nonlocal semaphore 
     with (yield from semaphore): 
      with aiohttp.ClientSession() as session: 
       response = yield from session.get(url) 
       body = yield from response.content.read() 
       yield from response.wait_for_close() 
     return body, qid 
    return http_get 
def run_experiment(urls): 
    http_client = chunked_http_client(num_chunks=concurrent) 
    # http_client returns futures, save all the futures to a list 
    tasks = [http_client(url, qid) for url, qid in urls] 
    response = [] 
    # wait for futures to be ready then iterate over them 
    for future in asyncio.as_completed(tasks): 
     data, qid = yield from future 
     try: 
      out = handle_req(data, qid) 
     except Exception as err: 
      print("Error for {0} - {1}".format(qid,err)) 
      out = [qid, 999, 0, 0] 
     response.append(out) 
    return response 
# Run: 
loop = asyncio.get_event_loop() 
calc_routes = loop.run_until_complete(run_experiment(url_routes)) 

A5。線程無會話 - > 330 RPS

from threading import Thread 
from queue import Queue 
concurrent = 100 
def doWork(): 
    while True: 
     url,qid = q.get() 
     status, resp = getReq(url) 
     processReq(status, resp, qid) 
     q.task_done() 
def getReq(url): 
    try: 
     resp = requests.get(url) 
     return resp.status_code, resp 
    except: 
     return 999, None 
def processReq(status, resp, qid): 
    try: 
     json_geocode = json.loads(resp.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     out = [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid, url) 
     out = [qid, 999, 0, 0] 
    qres.put(out) 
    return 
#Run: 
qres = Queue() 
q = Queue(concurrent) 
for i in range(concurrent): 
    t = Thread(target=doWork) 
    t.daemon = True 
    t.start() 
for url in url_routes: 
    q.put(url) 
q.join() 
# Get results 
calc_routes = [qres.get() for _ in range(len(url_routes))] 

A6。使用HTTPConnectionPool進行線程處理 - > 1550 RPS

from threading import Thread 
from queue import Queue 
from urllib3 import HTTPConnectionPool 
concurrent = 100 
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent) 
def doWork(): 
    while True: 
     url,qid = q.get() 
     status, resp = getReq(url) 
     processReq(status, resp, qid) 
     q.task_done() 
def getReq(url): 
    try: 
     resp = conn_pool.request('GET', url) 
     return resp.status, resp 
    except: 
     return 999, None 
def processReq(status, resp, qid): 
    try: 
     json_geocode = json.loads(resp.data.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     out = [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid, url) 
     out = [qid, 999, 0, 0] 
    qres.put(out) 
    return 
#Run: 
qres = Queue() 
q = Queue(concurrent) 
for i in range(concurrent): 
    t = Thread(target=doWork) 
    t.daemon = True 
    t.start() 
for url in url_routes: 
    q.put(url) 
q.join() 
# Get results 
calc_routes = [qres.get() for _ in range(len(url_routes))] 

A7。要求期貨 - > 520 RPS

from requests_futures.sessions import FuturesSession 
from concurrent.futures import ThreadPoolExecutor, as_completed 
concurrent = 100 
def ReqOsrm(sess, resp): 
    try: 
     json_geocode = resp.json() 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     out = [200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err) 
     out = [999, 0, 0] 
    resp.data = out 
#Run: 
calc_routes = [] 
futures = {} 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session: 
    # Submit requests and process in background 
    for i in range(len(url_routes)): 
     url_in, qid = url_routes[i] # url |query-id 
     future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) 
     futures[future] = qid 
    # Process the futures as they become complete 
    for future in as_completed(futures): 
     r = future.result() 
     try: 
      row = [futures[future]] + r.data 
     except Exception as err: 
      print('No route') 
      row = [futures[future], 999, 0, 0] 
     calc_routes.append(row) 

多進程

P8。 multiprocessing.worker +隊列+ requests.session() - > 1058 RPS

from multiprocessing import * 
class Worker(Process): 
    def __init__(self, qin, qout, *args, **kwargs): 
     super(Worker, self).__init__(*args, **kwargs) 
     self.qin = qin 
     self.qout = qout 
    def run(self): 
     s = requests.session() 
     while not self.qin.empty(): 
      url, qid = self.qin.get() 
      data = s.get(url) 
      self.qout.put(ReqOsrm(data, qid)) 
      self.qin.task_done() 
def ReqOsrm(resp, qid): 
    try: 
     json_geocode = json.loads(resp.content.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid) 
     return [qid, 999, 0, 0] 
# Run: 
qout = Queue() 
qin = JoinableQueue() 
[qin.put(url_q) for url_q in url_routes] 
[Worker(qin, qout).start() for _ in range(cpu_count())] 
qin.join() 
calc_routes = [] 
while not qout.empty(): 
    calc_routes.append(qout.get()) 

P9。多。worker + queue + HTTPConnectionPool() - > 1230 RPS

P10。多處理V2(真的不知道這是怎麼不同) - > 1350 RPS

conn_pool = None 
def makePool(host, port): 
    global conn_pool 
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1) 
def ReqOsrm(data): 
    url, qid = data 
    try: 
     response = conn_pool.request('GET', url) 
     json_geocode = json.loads(response.data.decode('utf-8')) 
     tot_time_s = json_geocode['paths'][0]['time'] 
     tot_dist_m = json_geocode['paths'][0]['distance'] 
     return [qid, 200, tot_time_s, tot_dist_m] 
    except Exception as err: 
     print("Error: ", err, qid, url) 
     return [qid, 999, 0, 0] 
# Run: 
pool = Pool(initializer=makePool, initargs=(ghost, gport)) 
calc_routes = pool.map(ReqOsrm, url_routes) 

所以在最後似乎是對我最好的方法是#10(和令人驚訝的#6)

+1

你可以嘗試的另一種方法是使用asyncio(或gevent)的多處理。我只使用gevent,但由於是單線程協程,它只能利用單個核心。協程開關應該比線程快,因此多處理+協同程序可能是最快的。 –

+0

你打算選擇一個答案嗎? – RootTwo

+0

運行P8時出現錯誤:ChunkedEncodingError(ProtocolError('Connection broken:IncompleteRead(162 bytes read)',IncompleteRead(162 bytes read))) – Phillip

1

這裏我使用gevent的模式,它是基於協程的,可能不會受到GIL的影響。與多重組合使用時,這可能會比使用線程,也許最快的速度(目前它會只用1個核心):

from gevent import monkey 
monkey.patch_all() 

import logging 
import random 
import time 
from threading import Thread 

from gevent.queue import JoinableQueue 
from logger import initialize_logger 

initialize_logger() 
log = logging.getLogger(__name__) 


class Worker(Thread): 

    def __init__(self, worker_idx, queue): 
     # initialize the base class 
     super(Worker, self).__init__() 
     self.worker_idx = worker_idx 
     self.queue = queue 

    def log(self, msg): 
     log.info("WORKER %s - %s" % (self.worker_idx, msg)) 

    def do_work(self, line): 
     #self.log(line) 
     time.sleep(random.random()/10) 

    def run(self): 
     while True: 
      line = self.queue.get() 
      self.do_work(line) 
      self.queue.task_done() 


def main(number_of_workers=20): 
    start_time = time.time() 

    queue = JoinableQueue() 
    for idx in range(number_of_workers): 
     worker = Worker(idx, queue) 
     # "daemonize" a thread to ensure that the threads will 
     # close when the main program finishes 
     worker.daemon = True 
     worker.start() 

    for idx in xrange(100): 
     queue.put("%s" % idx) 

    queue.join() 
    time_taken = time.time() - start_time 
    log.info("Parallel work took %s seconds." % time_taken) 

    start_time = time.time() 
    for idx in xrange(100): 
     #log.info(idx) 
     time.sleep(random.random()/10) 
    time_taken = time.time() - start_time 
    log.info("Sync work took %s seconds." % time_taken) 


if __name__ == "__main__": 
    main()