2016-08-14 44 views
0

我有以下需要使用spark(scala)進行排序的數據,我只需要訪問「Walmart」但不是「Bestbuy」的人員的ID。商店可能是重複的,因爲一個人可以多次訪問商店。如何使用scala過濾spark-shell中的數據?

輸入數據:

ID,商店

1,沃爾瑪

1,沃爾瑪

1,Bestbuy驗

2,目標

3,沃爾瑪

4,百思買

產出預期: 3,沃爾瑪

我一直在使用dataFrames並運行SQL查詢的火花背景下得到了輸出。但有沒有辦法使用groupByKey/reduceByKey等沒有dataFrames做到這一點。有人可以幫我的代碼,地圖 - >groupByKeyShuffleRDD已經形成,我面臨着難以過濾CompactBuffer

與我得到它使用sqlContext的代碼如下:

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
import sqlContext.createSchemaRDD 

case class Person(id: Int, store: String) 

val people = sc.textFile("examples/src/main/resources/people.txt") 
       .map(_.split(",")) 
       .map(p => Person(p(1)trim.toInt, p(1))) 
people.registerTempTable("people") 

val result = sqlContext.sql("select id, store from people left semi join (select id from people where store in('Walmart','Bestbuy') group by id having count(distinct store)=1) sample on people.id=sample.id and people.url='Walmart'") 

,我現在想的代碼是這樣的,但我在第三步驟之後來襲:

val data = sc.textFile("examples/src/main/resources/people.txt") 
      .map(x=> (x.split(",")(0),x.split(",")(1))) 
      .filter(!_.filter("id")) 
val dataGroup = data.groupByKey() 
val dataFiltered = dataGroup.map{case (x,y) => 
    val url = y.flatMap(x=> x.split(",")).toList 
    if (!url.contains("Bestbuy") && url.contains("Walmart")){ 
     x.map(x=> (x,y))}} 

如果我做dataFiltered.collect(),我得到 Array [Any] = Array(Vector((3,Walmart)),(),())

請幫助我如何提取輸出這一步

+2

如果您向我們展示您嘗試的代碼會更容易 - 一個最小的,完整的,可驗證的示例(請參閱http://stackoverflow.com/help/mcve) –

+0

@TzachZohar:我已經更新了問題中的代碼,請看一看在它。謝謝! – bigdataenthusiast

+0

dataFiletered.collect()的foreach(的println)給了我 矢量((2,abc.com)) () () 但我只需要=> 3,沃爾瑪 – bigdataenthusiast

回答

0

要篩選RDD後,只需使用RDD.filter

val dataGroup = data.groupByKey() 

val dataFiltered = dataGroup.filter { 
    // keep only lists that contain Walmart but do not contain Bestbuy: 
    case (x, y) => val l = y.toList; l.contains("Walmart") && !l.contains("Bestbuy") 
} 

dataFiltered.foreach(println) // prints: (3,CompactBuffer(Walmart)) 

// if you want to flatten this back to tuples of (id, store): 
val result = dataFiltered.flatMap { case (id, stores) => stores.map(store => (id, store)) } 

result.foreach(println) // prints: (3, Walmart) 
+0

謝謝你這麼多@TzachZohar ,它工作完美。在過濾RDD時,我不必要複雜的事情 – bigdataenthusiast

0

我也嘗試了另一種方式,而且合作的

val data = sc.textFile("examples/src/main/resources/people.txt") 
    .filter(!_.filter("id")) 
     .map(x=> (x.split(",")(0),x.split(",")(1))) 
data.cache() 
val dataWalmart = data.filter{case (x,y) => y.contains("Walmart")}.distinct() 
val dataBestbuy = data.filter{case (x,y) => y.contains("Bestbuy")}.distinct() 
val result = dataWalmart.subtractByKey(dataBestbuy) 
data.uncache()