2016-06-07 199 views
1

我有一個Spark數據流進程,它將kafka, 的數據讀入DStream。在Spark Streaming中緩存DStream

在我的管道我做兩次(陸續):

DStream.foreachRDD(RDD上的轉換和插入到目的地)。

(每次我做不同的處理和插入數據到不同的目的地)。

我想知道DStream.cache在從卡夫卡工作中讀取數據後會如何?可以做到嗎?

該過程現在是否實際上從卡夫卡讀取數據兩次?

請記住,這是不可能放兩個foreachRDDs成一個(因爲兩條路徑有很大的不同,也有有狀態的轉變存在 - 這需要對DSTREAM被appliend ...)

謝謝您的幫助

+0

Dstream.cache將工作。它在第一次看到某個動作時緩存該流。對於DStream中的後續操作,它使用緩存。 – Knight71

+0

@ Knight71當DStream不再需要時,我還需要設置DStream.unpersist(true),與緩存RDD時一樣? –

+0

Dstream數據將在所有操作完成後自動清除,並且基於轉換由火花流確定。 – Knight71

回答

3

這裏有兩種選擇:

  • 使用Dstream.cache()的緩存,以紀念底層RDDS。 Spark Streaming將負責在spark.cleaner.ttl配置控制的超時後暫停RDD。

  • 使用額外foreachRDD申請cache()unpersist(false)影響的操作到RDDS在DSTREAM:

如:

val kafkaDStream = ??? 
val targetRDD = kafkaRDD 
         .transformation(...) 
         .transformation(...) 
         ... 
// Right before the lineage fork mark the RDD as cacheable: 
targetRDD.foreachRDD{rdd => rdd.cache(...)} 
targetRDD.foreachRDD{do stuff 1} 
targetRDD.foreachRDD{do stuff 2} 
targetRDD.foreachRDD{rdd => rdd.unpersist(false)} 

請注意,您可以納入緩存爲第一如果這是一個選項do stuff 1陳述。

我更喜歡這個選項,因爲它使我能夠對緩存生命週期進行細粒度的控制,並且可以在需要時立即清理內容,而不是依賴於ttl。

+0

'''spark.cleaner.ttl'''被刪除。這是什麼新的財產控制? – okwap