2016-07-25 62 views
3

我試圖解決以下問題:在谷歌應用程序引擎上使用任務隊列時,如何優先處理任務?

  1. 我有一系列的「任務」,我想執行
  2. 我有一個固定數量的工人來執行這些工人(因爲他們叫使用urlfetch的外部API和並行調用這個API的數量是有限的)
  3. 我想爲這些「任務」儘快執行(即最小延遲)
  4. 這些任務是部分可以根據原始任務的大小進行分類(即一個小的原始任務可能產生1到100個任務s,100到1000的中等和1000以上的大)。

棘手的部分:我想盡可能有效地做到這一點(即最小延遲和儘可能多的並行API調用 - 沒有超過限制),但同時嘗試防止大從「大型」原始任務生成的任務數量,以延遲從「小型」原始任務生成的任務。換句話說:我希望爲具有較高優先級的「小」任務的每個任務分配一個「優先級」,從而防止從「大」任務中匱乏。

一些摸索似乎並沒有表明任何預先製作是可用的,所以我想出了以下內容:

  • 創建三個推送隊列:tasks-smalltasks-mediumtasks-large
  • 集併發請求的最大數量,以使總數爲併發API調用的最大數量(例如,如果最大併發API調用數量爲200,則可以設置tasks-small以使max_concurrent_requests爲30,tasks-medium60和tasks-large 100)
  • 排隊任務時,請檢查編號。 (使用類似QueueStatistics類的待處理任務),並且如果其他隊列未被100%利用,則將任務排隊到那裏,否則只需將隊列排入具有相應大小的隊列中。

例如,如果我們有任務T1這是一個小任務的一部分,首先檢查tasks-small有免費的「槽」和排隊在那裏。否則檢查tasks-mediumtasks-large。如果它們都沒有空閒插槽,則無論如何都將其排入tasks-small,並在處理它之前添加任務之後處理它(注意:這不是最佳的,因爲如果其他隊列上的「插槽」釋放,它們仍然不會tasks-small隊列中的進程掛起任務)

另一種選擇是使用PULL隊列,並根據優先級從該隊列中拉出一箇中心「協調器」並分配它們,但似乎會增加一點延遲。

但是,這似乎有點hackish,我想知道是否有更好的選擇。


編輯:之後的一些想法和反饋我想通過以下方式使用拉入隊列畢竟:

  • 有兩個拉入隊列(medium-taskslarge-tasks
  • 有一個調度員(PUSH)隊列併發性爲1(因此只有一個調度任務在任何時候運行)。調度任務以多種方式發佈:
    • 通過一次一分鐘的cron作業
    • 工人的任務後增加一箇中型/大型任務推送隊列
    • 後完成
  • 有職工(PUSH)隊列併發等於工人

的數量和工作流程:

  • 小任務直接添加到工作隊列
  • 調度任務,無論何時被觸發,執行以下操作:
    • 估計自由工人的數量(通過查看正在運行的任務數工作者隊列)
    • 對於任何「空閒」時隙,它從中等/大型任務PULL隊列中抽取一個任務並將其排入工作者(或更準確地說:將其添加到工作者PUSH隊列中,這將導致它被執行 - 最終 - 在工人身上)。

一旦這樣實施,至少是適度測試我馬上彙報。

回答

0

編輯:我現在遷移到一個簡單的解決方案,類似於@埃裏克 - 西蒙頓所描述的:

  • 我有多個拉入隊列,每個優先級
  • 許多工人在端點上拉(處理程序)
  • 處理程序生成一個隨機數並執行一個簡單的「如果小於0.6,先嚐試小隊列,然後是大隊列,反之亦然(大小然後)」
  • 如果工人沒有得到任務或者出現錯誤,他們會進行半隨機指數回退,直到最大時間(即。他們開始每隔1秒拉動一次,並且在每次空拉動後大約加倍超過30秒)

這個最後一點是需要的 - 除其他原因外 - 因爲從PULL隊列中拉/秒的次數限制爲10K /秒:https://cloud.google.com/appengine/docs/python/taskqueue/overview-pull#Python_Leasing_tasks


我實現在UPDATE描述的解決方案:

  • 兩個上拉隊列(中任務和大任務)
  • 一個調度程序(PUSH)隊列的1
  • 工人(PUSH)隊列中的併發性等於工人

的數目的併發詳情見的問題。一些注意事項:

  • 存在任務的知名度一些延遲,由於最終一致性(即調度任務,有時沒有看到,即使它們被插入一起拉隊列中的任務。) - 我工作圍繞加入對調度程序任務進行倒計時5秒,並添加一個cron作業,每分鐘添加一個調度程序任務(因此,如果原始調度程序任務不從「拉」隊列「看到」任務,另一個將在稍後出現)
  • 確保命名每個任務以消除雙重調度的可能性
  • 您不能從PULL隊列中租用0個項目:-)
  • 批處理操作有一個上限,所以你必須做批處理taskqueue調用
  • 你自己的批處理似乎沒有一種方式編程方式獲得隊列的「最大並行性」值,所以我不得不硬編碼在調度(計算多少任務,它可以調度)
  • 不添加調度的任務,如果他們已經有一些(至少10個),在隊列中
1

小型/中型/大型原始任務隊列本身幫助不大 - 一旦原始任務排入隊列,它們將保留產卵工作任務,甚至可能破壞工作任務隊列大小限制。所以你需要調整/控制原始任務的排隊。

我跟蹤的「待辦事項」原始任務數據存儲/ GCS且僅當相應的隊列大小是足夠低的排隊這些原始任務(1或也許2未決作業),從任一個重複任務,cron作業或延遲任務(取決於您需要執行原始任務排隊的速率),它將像推送隊列調度程序一樣實現所需的步調和優先級邏輯,但沒有您提到的額外延遲。

1

我沒有使用拉隊列,但從我的理解他們可以很好地適合您的使用情況。你可以定義3個拉式隊列,並且有X工作人員都從他們那裏抽取任務,首先嚐試「小」隊列,如果隊列爲空(其中X是最大併發),則移至「中等」隊列。你不應該需要一箇中央調度員。

但是,即使沒有任務(或X/threadsPerMachine?),您仍然需要支付X工作人員的費用,或者自行將其縮小爲&。

所以,這裏有另一個想法:使用正確的maximum concurrency作出單個推隊列。當您收到一個新任務時,將其信息推送到數據存儲區並排隊等待通用作業。然後該通用作業將查詢數據存儲以優先順序查找任務,執行找到的第一個任務。這樣,即使該作業已經從大型任務中排隊,短期任務仍將由下一個作業執行。

相關問題