我正在尋找一種功能強大且快速的方式來處理Google App Engine中大文件的處理。如何處理GAE上的大文件處理?
它的工作原理如下(簡化的工作流程,在年底):
- 客戶發送一個CSV文件,我們的服務器將通過線治療,行。
- 文件上傳完成後,會在NDB數據存儲
Uploads
中添加一個條目,其中包含CSV名稱,文件路徑(至Google存儲)以及一些基本信息。然後,創建一個任務,稱爲「預處理」。 - 預處理任務將在CSV文件的所有行上循環(可能是數百萬),並且會爲每個行添加一個NDB條目到
UploadEntries
模型,其中包含CSV ID,行,要提取/如果該行已經開始處理,並且結束處理(「is_treating」,「is_done」) - 一旦預處理任務結束,它就更新客戶端的信息「XXX行將被處理「
- 致電
Uploads.next()
。該next
方法:- 搜索已經以虛假
is_treating
和is_done
的UploadEntries
, - 將在Redis的數據存儲添加一個任務找到下一行。 (使用Redis數據存儲是因爲這裏的工作是在不由Google管理的服務器上進行的)
- 也會在任務中創建新條目
Process-healthcheck
(此任務在5分鐘後運行並檢查7)是否已正確執行。如果不是,則認爲Redis/Outside服務器發生故障並且執行與7)相同的操作,但沒有結果(而是使用「error」))。 - 然後,它將
UploadEntries.is_treating
更新爲該條目的True。
- 搜索已經以虛假
- 外部服務器將處理數據,並通過向服務器上的端點發出POST請求返回結果。
- 該端點更新數據存儲中的
UploadEntries
條目(包括「is_treating
」和「is_done
」),並調用Uploads.next()
以啓動下一行。 - 在Uploads.next中,搜索下一個條目時不會返回任何內容,我認爲該文件將被最終處理,並調用任務
post-process
,該任務將使用處理後的數據重建CSV,並將其返回給客戶。
這裏有幾件事情要記住:
- ,做真正的工作是谷歌的AppEngine之外的服務器上,這就是爲什麼我不得不拿出Redis的。
- 目前的做事方式給了我要處理的並行條目數量的靈活性:在5)中,
Uploads.next()
方法包含一個limit
參數,它讓我可以並行搜索並行進程。可以是1,5,20,50。 - 在這種情況下,我不能直接將
pre-processing
任務中的所有行直接添加到Redis becase中,下一位客戶將不得不等待第一個文件完成處理,這將會花費太長的時間
但該系統具有的各種問題,這就是爲什麼我轉向你的幫助:
- 有時候,這個系統是如此之快,數據存儲是沒有正確時更新調用
Uploads.next()
,條目返回a (只是entry.is_treating = True
還沒有被推送到數據庫) - Redis或我的服務器(我真的不知道)有時會丟失任務,或者沒有進行處理後的POST請求,所以任務永遠不會去
is_done = True
。這就是爲什麼我必須實施Healcheck系統,以確保無論如何都能正確處理生產線。這具有雙重優勢:該任務的名稱包含csv ID和行。每個文件都是獨一無二的。如果數據存儲不是最新的並且同一任務運行兩次,則健康檢查的創建將失敗,因爲已存在相同的名稱,讓我知道存在併發問題,所以我忽略該任務,因爲它意味着數據存儲尚未更新。
我initiall至想過通過線一個獨立的進程文件,行,但這並沒有能夠並行運行多個線的大的缺點。此外,谷歌將任務的執行時間限制在24小時內,而不是默認值,當文件非常大時,它可以運行超過24小時。
的信息,如果有幫助,我使用Python的
,並簡化工作流程,這裏就是我試圖以儘可能最好的方式來實現:
- 處理一個大文件,運行多個paralllel進程,每行一個。
- 使用Redis將工作發送到外部服務器。一旦這樣做,是對外部服務器通過POST請求到主服務器的
- 主服務器,然後更新有關該行的信息返回結果,並進入到下一行
我真的很感激,如果有人有一個更好的方式來做到這一點。我真的相信我不是第一個做這種工作的人,我很確定我沒有做正確的工作。我相信Stackoverflow是Stack Exchange最好的部分,因爲它是一個算法問題,但它也有可能我沒有看到更好的網絡。如果是這樣,我很抱歉那)。
我想你可以使用app engine mapreduce來做這件事,它可以從GCS逐行讀取CSV格式的緩衝區,並在多個實例上運行它。它會根據您的設置處理每個請求的N行。但是,GAE實例很昂貴。 –