2016-03-15 98 views
0

我每5分鐘從ElasticSearch讀取數據到Spark。所以每5分鐘會有一個RDD。如何從持續的RDD構建DStream?

我希望能夠基於這些RDD構建一個DStream,以便我可以在最近1天,最近1小時,最近5分鐘等內獲取數據報告。

爲了構建DStream,我一直在考慮創建自己的接收器,但spark的官方文檔只能使用scala或java來提供信息。我使用python。

那麼你知道有什麼辦法嗎?我知道我們可以。畢竟DStream是一系列RDD,當然我們應該從持續的RDD創建DStream。我只是不知道如何。請給出一些建議

回答

1

寫你自己的接收器將是你提到的一種方式,但看起來像很多開銷。你可以做的是使用QueueReceiver,它創建QueueInputDStreamthis example一樣。這是斯卡拉,但你也應該能夠做類似的事情在Python:

val rddQueue = new Queue[RDD[Map[String, Any]]]() 
val inputStream = ssc.queueStream(rddQueue) 

之後,您只需要查詢您的ES實例的每個X sec/min/h/day/whatever,你把結果放到該隊列。

使用Python我想這將是這樣的:

rddQueue = [] 
rddQueue += es_rdd() // method that returns an RDD from ES 
inputStream = ssc.queueStream(rddQueue) 

// some kind of loop that adds to rddQueue new RDDS 

顯然,你需要有東西在排隊,你在pyspark使用它裏面queueStream(或至少我得到異常之前,如果它是空)。

+0

謝謝@Mateusz Dymczyk。 queueStream將不起作用,因爲在創建DStream之後。任何添加到隊列中的新rdd都不會計入 –

+0

@KramerLi啊你是對的,我想知道是否有某種方法可以改變這種行爲... –