2015-10-05 82 views
1

我正在使用Scala編寫Spark流應用程序,其中我的目標是通過每秒讀取Twitter源來計算60秒窗口中最多轉發的狀態。使用reduceByKeyAndWindow()運行在Scala上的滑動窗口使用reduceByKeyAndWindow()

我在概念上想要的是在滑動窗口結束時獲取狀態轉發的數量,並在開始時從等效數字中減去它的數量,以便找到否。在窗口內轉推。代碼的相關行是:

val counts = tweets.filter(_.isRetweet).map { status => 
       (status.getText(), status.getRetweetedStatus().getRetweetCount()) 
      }.reduceByKeyAndWindow(*function*, Seconds(60), Seconds(1)) 

所以,我的問題是我應該使用什麼功能,在這裏達到預期的效果,那就是得到最大的價值,該窗口內getRetweetCount()回報,減去最小值它。

回答

0

糾正我,如果我錯了或在這裏作出錯誤的假設,但你基本上檢查Seconds(60)窗口內的狀態轉推的數量。要做到這一點,您已經擁有可以移除所有未轉推推文的過濾器(filter(_.isRetweet))。現在,您需要做的就是彙總轉發的狀態以確定其頻率。

這可以通過以下操作來實現:

val counts = tweets.filter(_.isRetweet).map { status => 
       (status.getText(), null) 
      }.countByValueAndWindow(Seconds(60), Seconds(1)) 

也許在此之後,你可以通過價值秩序,窗口內雲集最多轉推的tweets。

+0

這也是我的第一個想法,但後來我意識到,我正在閱讀的公共Twitter流只是給我一小部分全球流量,所以我錯過了很多推文。這就是爲什麼我使用getRetweetCount()來查看是否有錯過的轉發。所以我想要的是,在窗口內獲得轉推計數的最大值,並從中減去最小值,以確保我捕獲了所有內容。 – nikos