2017-06-16 76 views
0

我是Scala的新手,目前我所做的是從大數據集中過濾數據並將它們打印爲csv。因此,CSV我打印的格式如下:如何在Scala中添加每個更改的最後記錄

id   time        status 
___  _____       _________ 
1  2016-10-09 00:09:10     100 
1  2016-10-09 00:09:30     100 
1  2016-10-09 00:09:50     100 
1  2016-10-09 00:10:10     900 
2  2016-10-09 00:09:18     100 
2  2016-10-09 00:09:20     100 
2  2016-10-09 00:10:24     900 
3  2016-10-09 00:09:30     100 
3  2016-10-09 00:09:33     100 
3  2016-10-09 00:09:36     100 
3  2016-10-09 00:09:39     100 
3  2016-10-09 00:09:51     900 

我使用下面的代碼打印數據:

 var count=0; 

     val StatusList = ListBuffer[String](); 
     for (currentRow <- sortedRow) { 
       if (currentRow.status==100){ 
        StatusList.+=(currentRow.id+","+currentRow.time+","+currentRow.status) 
       } 
       if((count+1) < sortedRow.size && sortedRow(count+1).status==900) { 
        StatusList.+=(sortedRow(count+1).id+","+sortedRow(count+1).time+","+sortedRow(count+1).status) 
       } 
    count+=1; 

    } 

這個我想用狀態100打印行,而不是和追加記錄當他們改變時。基本上我想要打印的數據如下:

id  time    status id  change_time   status 
___  _____    _________ __ ______________  _______ 
1 2016-10-09 00:09:10  100  1  2016-10-09 00:10:10 900 
1 2016-10-09 00:09:30  100  1  2016-10-09 00:10:10 900 
1 2016-10-09 00:09:50  100  1  2016-10-09 00:10:10 900 
2 2016-10-09 00:09:18  100  2  2016-10-09 00:10:24 900 
2 2016-10-09 00:09:20  100  2  2016-10-09 00:10:24 900 
3 2016-10-09 00:09:30  100  3  2016-10-09 00:09:51 900 
3 2016-10-09 00:09:33  100  3  2016-10-09 00:09:51 900 
3 2016-10-09 00:09:36  100  3  2016-10-09 00:09:51 900 
3 2016-10-09 00:09:39  100  3  2016-10-09 00:09:51 900 
+0

你可以將兩個狀態分成兩個csvs,但是追加規則是什麼?隨機追加還是有一些嚴格的追加規則? –

+0

我可以分開,但爲了進一步分析,我需要保持上述格式 – Ricky

+0

您沒有仔細閱讀我的問題。我問這個組合的規則是什麼? –

回答

3

我使用dataframes這是RDD S DONE的優化和改進工作建議你的解決方案。

我假設的數據是在與標題行格式

id,time,status 
1,2016-10-0900:09:10,100 
1,2016-10-0900:09:30,100 
1,2016-10-0900:09:50,100 
1,2016-10-0900:10:10,900 

第一步是使用sqlContext

val sqlContext = sparkSession.sqlContext 
val dataframe = sqlContext.read.format("csv").option("header", "true").load("absolute path to the input file") 
文件讀入 dataframe

你應該有dataframe

+---+------------------+------+ 
|id |time    |status| 
+---+------------------+------+ 
|1 |2016-10-0900:09:10|100 | 
|1 |2016-10-0900:09:30|100 | 
|1 |2016-10-0900:09:50|100 | 
|1 |2016-10-0900:10:10|900 | 
|2 |2016-10-0900:09:18|100 | 
|2 |2016-10-0900:09:20|100 | 
|2 |2016-10-0900:10:24|900 | 
|3 |2016-10-0900:09:30|100 | 
|3 |2016-10-0900:09:33|100 | 
|3 |2016-10-0900:09:36|100 | 
|3 |2016-10-0900:09:39|100 | 
|3 |2016-10-0900:09:51|900 | 
+---+------------------+------+ 

下一步將b e將dataframestatus差分濾波器成兩個

val df1 = dataframe.filter(dataframe("status") === "100") 

輸出作爲

+---+------------------+------+ 
|id |time    |status| 
+---+------------------+------+ 
|1 |2016-10-0900:09:10|100 | 
|1 |2016-10-0900:09:30|100 | 
|1 |2016-10-0900:09:50|100 | 
|2 |2016-10-0900:09:18|100 | 
|2 |2016-10-0900:09:20|100 | 
|3 |2016-10-0900:09:30|100 | 
|3 |2016-10-0900:09:33|100 | 
|3 |2016-10-0900:09:36|100 | 
|3 |2016-10-0900:09:39|100 | 
+---+------------------+------+ 

遵循900狀態同樣爲df2column名稱更名

val df2 = dataframe.filter(dataframe("status") === "900") 
    .withColumnRenamed("id", "id2") 
    .withColumnRenamed("time", "changed_time") 
    .withColumnRenamed("status", "status2") 

輸出應該是

+---+------------------+-------+ 
|id2|changed_time  |status2| 
+---+------------------+-------+ 
|1 |2016-10-0900:10:10|900 | 
|2 |2016-10-0900:10:24|900 | 
|3 |2016-10-0900:09:51|900 | 
+---+------------------+-------+ 

最後一步是join這兩個dataframes

val finalDF = df1.join(df2, df1("id") === df2("id2"), "left") 

最終輸出是

+---+------------------+------+---+------------------+-------+ 
|id |time    |status|id2|changed_time  |status2| 
+---+------------------+------+---+------------------+-------+ 
|1 |2016-10-0900:09:10|100 |1 |2016-10-0900:10:10|900 | 
|1 |2016-10-0900:09:30|100 |1 |2016-10-0900:10:10|900 | 
|1 |2016-10-0900:09:50|100 |1 |2016-10-0900:10:10|900 | 
|2 |2016-10-0900:09:18|100 |2 |2016-10-0900:10:24|900 | 
|2 |2016-10-0900:09:20|100 |2 |2016-10-0900:10:24|900 | 
|3 |2016-10-0900:09:30|100 |3 |2016-10-0900:09:51|900 | 
|3 |2016-10-0900:09:33|100 |3 |2016-10-0900:09:51|900 | 
|3 |2016-10-0900:09:36|100 |3 |2016-10-0900:09:51|900 | 
|3 |2016-10-0900:09:39|100 |3 |2016-10-0900:09:51|900 | 
+---+------------------+------+---+------------------+-------+ 

保存最後dataframecsv文件是很容易的,以及

finalDF.write.format("csv").save("absolute path to output filename ") 
相關問題