2016-11-26 54 views
2

我試圖減少時間序列數據,以收集結果在1小時內發生的成陣列(用於檢測最大值,最小值,平均值)。火花reduceByKey只在某些條件下降低

它看起來並不我能夠提供減少塊會確定是否減少應該發生(添加到陣列中的值)中的條件,或減少跳過。

//data 
//ID, VAL, DATETIME 
tvFile.map((x) => 
      (x.split(',')(0), (Array(x.split(',')(1)), Array(x.split(',')(2))))) //(ID, ([VAL], [DATETIME]) 
     .reduceByKey((a,b) => { 
       val dt1 = DateTime.parse(a._2(0)) 
       val dt2 = DateTime.parse(b._2(0)) 
       if ((dt1.getDayOfYear == dt2.getDayOfYear) && (dt1.getHourOfDay == dt2.getHourOfDay)) 
        (a._1 ++ b._1, a._2 ++ b._2) 
       else 
        // NOT SURE WHAT TO DO HERE 
       }).collect 

上面是不是最有效/正確/我開始與Spark/Scala。

回答

2

的做法應該是爲了有一個用於聚合將數據分區的關鍵準備數據。繼在問題的代碼,在這種情況下,關鍵應該是(id, day-of-year, hr-of-day)

一旦數據被正確處理,聚合是微不足道的。

例子:

val sampleData = Seq("p1,38.1,2016-11-26T11:15:10", 
         "p1,39.1,2016-11-26T11:16:10", 
         "p1,35.8,2016-11-26T11:17:10", 
         "p1,34.1,2016-11-26T11:18:10", 
         "p2,37.2,2016-11-26T11:16:00", 
         "p2,31.2,2016-11-27T11:17:00", 
         "p2,31.6,2016-11-27T11:17:00", 
         "p1,39.4,2016-11-26T12:15:10", 
         "p2,36.3,2016-11-27T10:10:10", 
         "p1,39.5,2016-11-27T12:15:00", 
         "p3,36.1,2016-11-26T11:15:10") 

val sampleDataRdd = sparkContext.parallelize(sampleData)       

val records = sampleDataRdd.map{line => 
          val parts = line.split(",") 
          val id = parts(0) 
          val value = parts(1).toDouble 
          val dateTime = DateTime.parse(parts(2)) 
          val doy = dateTime.getDayOfYear 
          val hod = dateTime.getHourOfDay 
          ((id, doy, hod), value) 
          } 

val aggregatedRecords = records.reduceByKey(_ + _)        
aggregatedRecords.collect 
// Array[((String, Int, Int), Double)] = Array(((p1,331,11),147.10000000000002), ((p2,332,11),62.8), ((p2,331,11),37.2), ((p1,332,12),39.5), ((p2,332,10),36.3), ((p1,331,12),39.4), ((p3,331,11),36.1)) 

這也更容易與Spark DataFrames很多。使用RDD API回答問題的方式如何。

+0

@tamersalama看到它也可作爲筆記本:https://gist.github.com/maasg/e470654d15a73a1cc1a280e37561a8a5 – maasg