我每5分鐘從ElasticSearch讀取數據到Spark。所以每5分鐘會有一個RDD。如何從持續的RDD構建DStream?
我希望能夠基於這些RDD構建一個DStream,以便我可以在最近1天,最近1小時,最近5分鐘等內獲取數據報告。
爲了構建DStream,我一直在考慮創建自己的接收器,但spark的官方文檔只能使用scala或java來提供信息。我使用python。
那麼你知道有什麼辦法嗎?我知道我們可以。畢竟DStream是一系列RDD,當然我們應該從持續的RDD創建DStream。我只是不知道如何。請給出一些建議
謝謝@Mateusz Dymczyk。 queueStream將不起作用,因爲在創建DStream之後。任何添加到隊列中的新rdd都不會計入 –
@KramerLi啊你是對的,我想知道是否有某種方法可以改變這種行爲... –