2016-08-02 357 views
0

我有數據集,如 10,「Name」,2016,「Country」 11,「Name1」,2016,「country1」 10,「Name」 「國家」 10,「姓名」,2016年,「國家」 12,「名稱2」,2017年,「COUNTRY2」Spark在RDD中查找字段的重複記錄

我的問題陳述我必須找到總數和重複按年計算。我的結果應該是(year,totalrecords,duplicates) 2016,4,3 2017,1,0。

我試圖通過

val records = rdd.map { 
       x => 
       val array = x.split(",") 
       (array(2),x) 
      }.groupByKey() 
val duplicates = records.map { 
       x => val totalcount = x._2.size 
         val duplicates = // find duplicates in iterator 
        (x._1,totalcount,duplicates) 
       } 

它運行良好高達10GB的數據來解決這個問題。如果我在更多數據上運行它,則需要很長時間。我發現groupByKey不是最好的方法。

請建議解決此問題的最佳方法。

回答

0

我不是sql專家,以您的示例顯示的方式計算重複項。不過,我認爲這會讓你開始使用數據框。我的理解是,數據框可以比直接RDD執行得更好。

scala> import com.databricks.spark.csv._ 
import com.databricks.spark.csv._ 

scala> 

scala> val s = List("""10,"Name",2016,"Country"""", """11,"Name1",2016,"country1"""", """10,"Name",2016,"Country"""", """10,"Name",2016,"Country"""", """12,"Name2",2017,"Country2"""") 
s: List[String] = List(10,"Name",2016,"Country", 11,"Name1",2016,"country1", 10,"Name",2016,"Country", 10,"Name",2016,"Country", 12,"Name2",2017,"Country2") 

scala> val rdd = sc.parallelize(s) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[92] at parallelize at <console>:38 

scala> 

scala> val df = new CsvParser().withDelimiter(',').withInferSchema(true).withParseMode("DROPMALFORMED").csvRdd(sqlContext, rdd) 
df: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> 

scala> df.registerTempTable("test") 

scala> 

scala> val dfCount = sqlContext.sql("select C2, count(*), count(distinct C0,C2,C1,C3) from test group by C2") 
dfCount: org.apache.spark.sql.DataFrame = [C2: int, _c1: bigint, _c2: bigint] 

scala> 

scala> dfCount.show 
+----+---+---+                 
| C2|_c1|_c2| 
+----+---+---+ 
|2016| 4| 2| 
|2017| 1| 1| 
+----+---+---+