2017-05-29 115 views
3

我們正在使用aiohttp構建休息api。我們的應用程序設計爲使用戶比接收響應更頻繁地發送請求(因爲計算時間)。對於用戶來說只是最新請求的結果很重要。是否有可能停止計算過時的請求?當新請求來自同一用戶時,如何取消先前的請求

謝謝

+0

,與其違反HTTP的核心原則之一。如果您想將該服務擴展到負載均衡器後面的多個服務器,該怎麼辦?那麼它會得到一些其他服務器可能處理的非常棘手的取消請求。這可能不是你應該試圖解決的實際問題! – deceze

+0

「取消」意味着什麼?服務器斷開連接? – deceze

+0

重寫後:這是否意味着一個HTTP請求可能需要幾分鐘(?)才能完成?這也是不好的HTTP設計;一個請求應該在幾秒鐘內最多回答,典型的基準目標是200ms。如果您需要更長時間的運行作業,請將其視爲作業:一個HTTP請求啓動後臺任務並返回任務ID。然後,客戶端可以使用另一個HTTP請求查詢有關作業狀態。例如。 'POST/tasks {params}','GET/tasks/42'。顯然,這些後臺任務可以通過任何在後端有意義的機制來取消。 – deceze

回答

3

你正在構建一個非常像HTTP的東西。一個HTTP請求不應該花費幾毫秒的時間來回答,並且HTTP請求不應該是相互依賴的;如果您需要執行需要相當長時間的計算,可以嘗試通過更改體系結構/模型/緩存/任何內容來加速它們,或者將其明確視爲可以通過HTTP接口控制的長時間運行作業這意味着「工作」是可以通過HTTP查詢的「物理資源」。您可以通過一個POST請求創建資源:

POST /tasks 
Content-Type: application/json 

{"some": "parameters", "go": "here"} 
{"resource": "/tasks/42"} 

然後你就可以查詢任務的狀態:

GET /tasks/42 
{"status": "pending"} 

,最終得到的結果:

GET /tasks/42 
{"status": "done", "results": [...]} 

當你帖子裏面新陳代謝一個新的任務,你的後臺可以取消以任何方式舊任務,它認爲合適的;該資源將返回「取消」或類似的狀態。開始新任務後,您的客戶將不會再次查詢舊資源。即使您的客戶端每秒查詢一次資源,它仍然會在服務器上使用更少的資源(一個連接在固定的10秒內打開,而10個連接在相同的時間範圍內打開200毫秒),特別是如果你對其應用一些智能緩存。由於您可以獨立於HTTP前端擴展任務後端,因此它的可擴展性更高,並且HTTP前端可以簡單地縮放到多個服務器和負載平衡器。

0

我會從@ Drizzt1991發佈的解決方案:

嘿,阿爾喬姆。你在那裏有一個奇怪的要求。 瞭解如果1個客戶端使用保持活動的套接字,在回答第一個請求之前,不可能看到下一個請求。這就是HTTP的工作方式,它期望在發送另一個請求之前的結果。 因此,如果客戶端將在2個獨立的套接字上運行,而您需要斷言來自同一個客戶端的2個套接字將在同一臺機器上路由,那麼您的情況纔會起作用。通過實踐,這對於故障轉移和其他東西並不是那麼有效。基本上它將是一個有狀態的API。 即使你這麼做,並非所有的圖書館都支持取消。傳統的關係數據庫只會忽略結果,但仍會處理待定查詢。如果你使用圖遍歷來做複雜的事情,並且你有很多步驟,那麼你可以取消它。

但是,如果你斷言,該客戶端使用套接字池,它們被路由到同一臺機器,並請求從取消中獲益,這樣的事情應該做的伎倆:

import asyncio 
import random 

from aiohttp import web 


def get_session_id(request): 
    # I don't know how you do session management, so left it out 
    return "" 


async def handle(request): 
    session_id = get_session_id(request) 
    request['tr_id'] = tr_id = int(random.random() * 1000000) 

    running_tasks = request.app['running_tasks'] 
    if session_id in running_tasks and not running_tasks[session_id].done(): 
     running_tasks[session_id].cancel() 
     del running_tasks[session_id] 

    current_task = asyncio.ensure_future(_handle_impl(request)) 
    running_tasks[session_id] = current_task 

    try: 
     resp = await current_task 
    except asyncio.CancelledError: 
     print("Cancelled request", tr_id) 
     resp = web.Response(text="Cancelled {}".format(tr_id)) 
    finally: 
     if running_tasks[session_id] is current_task: 
      del running_tasks[session_id] 
    return resp 


async def _handle_impl(request): 
    tr_id = request['tr_id'] 
    print("Start request", tr_id) 
    await asyncio.sleep(10) 
    print("Finished request", tr_id) 
    return web.Response(text="Finished {}".format(tr_id)) 


app = web.Application() 
app.router.add_get('/', handle) 
app.router.add_get('/{name}', handle) 

app['running_tasks'] = {} 

web.run_app(app, host="127.0.0.1", port=8080)