2017-10-08 58 views
0

能否請你幫我 - 我試圖使用Apache Flink與外部合奏/樹庫像XGBoost機器學習的任務,所以我的工作流程是這樣的:實現一堆阿帕奇弗林克並行應用到同一源流的變換和組合結果

  • 接收數據的單個流,其原子事件看起來像一個簡單的向量event=(X1, X2, X3...Xn)並且它可以被想象爲POJO字段所以最初我們有DataStream<event> source=...
  • 很多特徵提取碼應用於同一事件源: feature1 = source.map(X1...Xn)feature2 = source.map(X1...Xn)等c。爲了簡單起見,讓DataStream<int> feature(i) = source.map()所有功能
  • 然後我需要創建一個向量與提取功能(feature1, feature2, ...featureK)現在它將是40-50功能,但我相信它將包含更多的項目在未來和容易可以包含100 -500功能多
  • 把這些提取的特徵以數據集/表列通過進行10分鐘的窗口,並在這樣的運行10分鐘,最終的機器學習任務數據

在簡單的話,我需要申請幾個完全不同的map操作到流中相同的單個事件,然後將所有映射函數的結果合併到單個向量中。

因此,現在我無法弄清楚如果可能的話,如何實施最終減少步驟並運行parallel中的所有特徵提取map作業。我花了好幾天的弗林克文檔網站,YouTube視頻,谷歌搜索,閱讀Flink's來源,但它似乎我真的被困在這裏。這裏

的簡單的解決方案將是使用單map操作並且由一個在巨大map體運行的每個特徵提取代碼順序地一個,然後返回最終載體(Feature1...FeatureK)對於每個輸入事件。但它應該是瘋狂的和非最佳的。

對於每兩個特徵對另一種解決方案使用join因爲所有的功能數據流具有相同的初始事件和相同的密鑰和只適用於一些轉換代碼,但它看起來醜陋:寫50加入代碼一些window。我認爲,爲了加入來自不同來源的不同流,而不是針對這種地圖/縮減操作,開發了連接和組件。

至於我所有map操作這裏應該是簡單的一個東西,我很想念它。

可否請你指出我你們怎麼在Flink執行這樣的任務,如果可能的示例代碼?

謝謝!

+0

對於我來說,您的第一種方法似乎是最優的,即使您將有一個巨大的地圖操作可以提取單個數據樣本的所有特徵,但此過程將並行應用於所有進入的事件。無論如何將會並行處理。另外,在第二種方法中,整個連接過程將取決於最慢的特徵提取器,因此與第一個相比並不是最佳的。 –

+0

謝謝,有道理!另一種解決方案是使用像LMAX Disruptor一樣實施序列屏障的儀器。我已經使用Aeron作爲通信層,因此所有消息都先通過Aeron,然後他們來到Flink,將'map'計算結果再次放入環形緩衝區以加快吞吐量似乎是有意義的。當所有功能都將被計算 - 將結果打包並保存到最終表格。 –

+0

Flink方法更常見:'聯合(特徵1,特徵2 ...特徵K)'源,因此可以將流成像爲'f13 f21 f32 f41 f12 f11' - 來自所有事件和所有特徵部分的元素,然後使用'process函數將把所有無序的向量部分放到某個'state'中,並以正確的順序將完成的向量沖洗到'ctx.collect()',最後將指向'sink'功能。 –

回答

0

什麼是您希望每秒處理的事件數?如果它足夠高(~number of machines * number of cores),您應該很好地同時處理更多事件。不是使用多個特徵進行縮放,而是使用事件數量進行縮放。如果您擁有單個數據源,則在應用轉換之前,您仍然可以隨機隨機洗牌。

另一種解決方案可能是:

  1. 分配唯一的事件ID和拆分使用flatMap成元組的原始事件:<featureId, Xi, eventId>
  2. keyBy(featureId, eventId)(或者可以用shuffle()?做隨機分區?)。
  3. 執行轉換。
  4. keyBy(eventId, ...)
  5. 窗口並減少每個事件一個記錄。