2016-11-18 89 views
3

我有一個來自IoT應用程序的帶有JSON數據的Kafka代理。我從Spark Streaming應用程序連接到此服務器以執行一些處理。如何訪問Spark Streaming應用程序中的緩存數據?

我想保存在內存(RAM)我的JSON數據的一些特定領域,我相信我可以使用cache()persist()運營商實現。

下一次當我在Spark Streaming應用程序中接收到新的JSON數據時,我檢查內存(RAM)是否有可以檢索的公用字段。如果是的話,我做一些簡單的計算,最後我更新存儲在內存中的字段的值(RAM)。

因此,我想知道我之前所描述的是否有可能。如果是,我必須使用cache()還是persist()?我怎樣才能從記憶中檢索我的領域?

回答

2

有可能與使用內存或磁盤中的Spark中應用的數據cache/persist(不一定只有星火流媒體應用程序 - 它是caching in Spark更普遍使用)。

但是......在星火流你有這樣的使用情況被稱爲狀態計算特殊支持。請參閱Spark Streaming Programming Guide探索可能性。

我認爲你的使用案例mapWithState運營商正是你所追求的。

0

Spark不能這樣工作。請以分散的方式思考。

對於保持RAM的第一部分。您可以使用cache()persist()任何人,默認情況下他們將數據保存在工作人員的內存中。

您可以從Apache Spark代碼進行驗證。

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ 
    def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) 

    /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ 
    def cache(): this.type = persist() 

據我瞭解你的用例,你需要UpdateStateByKey操作來實現你的第二個用例!

有關窗口的更多信息,請參閱here

相關問題