0

我正在尋找一種功能強大且快速的方式來處理Google App Engine中大文件的處理。如何處理GAE上的大文件處理?

它的工作原理如下(簡化的工作流程,在年底):

  1. 客戶發送一個CSV文件,我們的服務器將通過線治療,行。
  2. 文件上傳完成後,會在NDB數據存儲Uploads中添加一個條目,其中包含CSV名稱,文件路徑(至Google存儲)以及一些基本信息。然後,創建一個任務,稱爲「預處理」。
  3. 預處理任務將在CSV文件的所有行上循環(可能是數百萬),並且會爲每個行添加一個NDB條目到UploadEntries模型,其中包含CSV ID,行,要提取/如果該行已經開始處理,並且結束處理(「is_treating」,「is_done」)
  4. 一旦預處理任務結束,它就更新客戶端的信息「XXX行將被處理「
  5. 致電Uploads.next()。該next方法:
    • 搜索已經以虛假is_treatingis_doneUploadEntries
    • 將在Redis的數據存儲添加一個任務找到下一行。 (使用Redis數據存儲是因爲這裏的工作是在不由Google管理的服務器上進行的)
    • 也會在任務中創建新條目Process-healthcheck(此任務在5分鐘後運行並檢查7)是否已正確執行。如果不是,則認爲Redis/Outside服務器發生故障並且執行與7)相同的操作,但沒有結果(而是使用「error」))。
    • 然後,它將UploadEntries.is_treating更新爲該條目的True。
  6. 外部服務器將處理數據,並通過向服務器上的端點發出POST請求返回結果。
  7. 該端點更新數據存儲中的UploadEntries條目(包括「is_treating」和「is_done」),並調用Uploads.next()以啓動下一行。
  8. 在Uploads.next中,搜索下一個條目時不會返回任何內容,我認爲該文件將被最終處理,並調用任務post-process,該任務將使用處理後的數據重建CSV,並將其返回給客戶。

這裏有幾件事情要記住:

  1. ,做真正的工作是谷歌的AppEngine之外的服務器上,這就是爲什麼我不得不拿出Redis的。
  2. 目前的做事方式給了我要處理的並行條目數量的靈活性:在5)中,Uploads.next()方法包含一個limit參數,它讓我可以並行搜索並行進程。可以是1,5,20,50。
  3. 在這種情況下,我不能直接將pre-processing任務中的所有行直接添加到Redis becase中,下一位客戶將不得不等待第一個文件完成處理,這將會花費太長的時間

但該系統具有的各種問題,這就是爲什麼我轉向你的幫助:

  1. 有時候,這個系統是如此之快,數據存儲是沒有正確時更新調用Uploads.next(),條目返回a (只是entry.is_treating = True還沒有被推送到數據庫)
  2. Redis或我的服務器(我真的不知道)有時會丟失任務,或者沒有進行處理後的POST請求,所以任務永遠不會去is_done = True。這就是爲什麼我必須實施Healcheck系統,以確保無論如何都能正確處理生產線。這具有雙重優勢:該任務的名稱包含csv ID和行。每個文件都是獨一無二的。如果數據存儲不是最新的並且同一任務運行兩次,則健康檢查的創建將失敗,因爲已存在相同的名稱,讓我知道存在併發問題,所以我忽略該任務,因爲它意味着數據存儲尚未更新。

我initiall至想過通過線一個獨立的進程文件,行,但這並沒有能夠並行運行多個線的大的缺點。此外,谷歌將任務的執行時間限制在24小時內,而不是默認值,當文件非常大時,它可以運行超過24小時。

的信息,如果有幫助,我使用Python的


,並簡化工作流程,這裏就是我試圖以儘可能最好的方式來實現:

  • 處理一個大文件,運行多個paralllel進程,每行一個。
  • 使用Redis將工作發送到外部服務器。一旦這樣做,是對外部服務器通過POST請求到主服務器的
  • 主服務器,然後更新有關該行的信息返回結果,並進入到下一行

我真的很感激,如果有人有一個更好的方式來做到這一點。我真的相信我不是第一個做這種工作的人,我很確定我沒有做正確的工作。我相信Stackoverflow是Stack Exchange最好的部分,因爲它是一個算法問題,但它也有可能我沒有看到更好的網絡。如果是這樣,我很抱歉那)。

+1

我想你可以使用app engine mapreduce來做這件事,它可以從GCS逐行讀取CSV格式的緩衝區,並在多個實例上運行它。它會根據您的設置處理每個請求的N行。但是,GAE實例很昂貴。 –

回答

1

,它真正的工作是谷歌的AppEngine之外

你有沒有考慮過使用Google Cloud Dataflow的不是處理大型文件的服務器? 這是一個託管服務,將爲您處理文件分割和處理。

基於這裏最初的想法是一個大綱過程:

  • 用戶上傳文件直接到谷歌雲存儲,使用signed urls或Blob存儲API
  • 從AppEngine上的請求啓動該啓動一個小的計算引擎實例阻止請求(BlockingDataflowPipelineRunner)啓動數據流任務。 (恐怕它需要成爲一個計算實例,因爲沙箱和阻塞I/O問題)。
  • 數據流任務完成後,計算引擎實例將被解除阻塞並將消息發佈到pubsub中。
  • pubsub消息調用AppEngine服務上的webhook,該服務將任務狀態從「進行中」更改爲「完成」,以便用戶可以獲取其結果。
+0

真棒,Dataflow看起來適合我。你知道它是否適用於輸入是XLS/XLSX,它與我開始(所以我不能只讀行)? –

+0

您可能需要製作自定義來源。用於開發自定義文件讀取器的[便利類](https://cloud.google.com/dataflow/model/custom-io-python#convenience-source-base-classes)。也許這可以與[用於閱讀XLS的許多python庫之一](http://www.python-excel.org/)結合使用。 –

+0

好的,這是我懷疑。我開始閱讀Dataflow背後的想法,但仍有一個問題困擾着我:我如何應用需要在外部服務器上等待回覆的'Transform'方法(我相信自己做的)?我是否會阻止當前進程並每隔n秒查詢一次響應的狀態,還是有更好的方法? –