我有以下需要使用spark(scala)進行排序的數據,我只需要訪問「Walmart」但不是「Bestbuy」的人員的ID。商店可能是重複的,因爲一個人可以多次訪問商店。如何使用scala過濾spark-shell中的數據?
輸入數據:
ID,商店
1,沃爾瑪
1,沃爾瑪
1,Bestbuy驗
2,目標
3,沃爾瑪
4,百思買
產出預期: 3,沃爾瑪
我一直在使用dataFrames並運行SQL查詢的火花背景下得到了輸出。但有沒有辦法使用groupByKey
/reduceByKey
等沒有dataFrames做到這一點。有人可以幫我的代碼,地圖 - >groupByKey
,ShuffleRDD
已經形成,我面臨着難以過濾CompactBuffer
!
與我得到它使用sqlContext
的代碼如下:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
case class Person(id: Int, store: String)
val people = sc.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(p => Person(p(1)trim.toInt, p(1)))
people.registerTempTable("people")
val result = sqlContext.sql("select id, store from people left semi join (select id from people where store in('Walmart','Bestbuy') group by id having count(distinct store)=1) sample on people.id=sample.id and people.url='Walmart'")
,我現在想的代碼是這樣的,但我在第三步驟之後來襲:
val data = sc.textFile("examples/src/main/resources/people.txt")
.map(x=> (x.split(",")(0),x.split(",")(1)))
.filter(!_.filter("id"))
val dataGroup = data.groupByKey()
val dataFiltered = dataGroup.map{case (x,y) =>
val url = y.flatMap(x=> x.split(",")).toList
if (!url.contains("Bestbuy") && url.contains("Walmart")){
x.map(x=> (x,y))}}
如果我做dataFiltered.collect(),我得到 Array [Any] = Array(Vector((3,Walmart)),(),())
請幫助我如何提取輸出這一步
如果您向我們展示您嘗試的代碼會更容易 - 一個最小的,完整的,可驗證的示例(請參閱http://stackoverflow.com/help/mcve) –
@TzachZohar:我已經更新了問題中的代碼,請看一看在它。謝謝! – bigdataenthusiast
dataFiletered.collect()的foreach(的println)給了我 矢量((2,abc.com)) () () 但我只需要=> 3,沃爾瑪 – bigdataenthusiast