2017-05-09 106 views
0

我的數據是這樣的:星火使用reduceByKey

customer1表| ITEM1:X1,X2,X3; ITEM2:X1,X4,X5; item1:x1,x3,x6 | time1 | url
Customer1 | ITEM1:X1,X7,X3; ITEM2:X1,X4,X5; item3:x5 | time2 | url2
Customer2 | ITEM1:X1,X7,X3;項目3:X5 |時間3 | URL3

我想ReduceByKey相同customerIds和mapValues獲得不同項目的工會爲每個客戶ID:

customer1表| ITEM1:X1,X2,X3; ITEM2:X1,X4,X5; ITEM1:X1,X3,X6; ITEM1:X1,X7,X3;項目3:X5

此我能夠通過實現:

VAL線= spark.sparkContext.textFile(參數(0))
VAL記錄= line.map(L =>升(x,y)=> x.union(y))。mapValues(x => .split(「\ |」))。map(l =>(l(0),l(1)))。reduceByKey x.distinct)

現在,我想在第二列中每個項目是唯一的,以及在同一個鍵的所有值應使用工會和獨特,拿東西等加入:

Customer1 | ITEM1:X1,X2,X3,X6,X7; ITEM2:X1,X4,X5;項目3:X5

一旦做到這一點我想選擇每個x的所有頻率,例如:X1:2,X 2:1 .... 和更新的X(1-10)的向量對CustomerID與我得到的頻率。

這可以在火花中實現嗎?

回答

0

是的,你當然可以在Spark中做到這一點!然而,你接近這個問題的方式使得它看起來有點難以實現。

所以我可以顯示一個完整的複製pastable到REPL例子,讓我們假設你的數據存儲在一個字符串(不ARGS(0)文件)

val data = """Customer1| item1:x1,x2,x3; item2:x1,x4,x5; item1:x1,x3,x6|time1|url 
Customer1| item1:x1,x7,x3; item2:x1,x4,x5; item3:x5|time2|url2 
Customer2| item1:x1,x7,x3; item3:x5|time3|url3""" 

和RDD你所說的「線」,可以被讀入RDD「rdd」爲

val rdd = sc.parallelize(data.split("\n")) 

到目前爲止沒有新的東西。下一步是重要的一步。我們可以準備我們的數據一次完成所有工作,而不是分層計數和彙總。這是更可讀性,也更有效,因爲它是一個單一的地圖,然後是一個單一的減少。

val mapped= rdd.flatMap(line => { 
    val arr = line.split("\\|") 
    val customer = arr(0) 
    val items = arr(1) 
    val time = arr(2) 
    val url = arr(3) 

    items.split(";").flatMap(item => { 
     val itemKey = item.split(":")(0) 
     val itemValues = item.split(":")(1).split(",") 

     itemValues.map(value => (customer, itemKey, value, time, url)) 
    }) 
}) 

我們可以看到什麼是在這一點,我們可以用mapped.toDF("customer", "itemId", "itemValue", "time", "url").show

+---------+------+---------+-----+----+ 
| customer|itemId|itemValue| time| url| 
+---------+------+---------+-----+----+ 
|Customer1| item1|  x1|time1| url| 
|Customer1| item1|  x2|time1| url| 
|Customer1| item1|  x3|time1| url| 
|Customer1| item2|  x1|time1| url| 
|Customer1| item2|  x4|time1| url| 
|Customer1| item2|  x5|time1| url| 
|Customer1| item1|  x1|time1| url| 
|Customer1| item1|  x3|time1| url| 
|Customer1| item1|  x6|time1| url| 
|Customer1| item1|  x1|time2|url2| 
|Customer1| item1|  x7|time2|url2| 
|Customer1| item1|  x3|time2|url2| 
|Customer1| item2|  x1|time2|url2| 
|Customer1| item2|  x4|time2|url2| 
|Customer1| item2|  x5|time2|url2| 
|Customer1| item3|  x5|time2|url2| 
|Customer2| item1|  x1|time3|url3| 
|Customer2| item1|  x7|time3|url3| 
|Customer2| item1|  x3|time3|url3| 
|Customer2| item3|  x5|time3|url3| 
+---------+------+---------+-----+----+ 

很好地打印出來。最後,我們可以指望,減少到你所需要的載體:

val reduced = mapped.map{case (customer, itemKey, itemValue, time, url) => ((customer, itemKey, itemValue), 1)}. 
    reduceByKey(_+_). 
    map{case ((customer, itemKey, itemValue), count) => (customer, itemKey, itemValue, count)} 

並查看它:reduced.toDF("customer", "itemKey", "itemValue", "count").show

+---------+-------+---------+-----+            
| customer|itemKey|itemValue|count| 
+---------+-------+---------+-----+ 
|Customer1| item1|  x2| 1| 
|Customer1| item1|  x1| 3| 
|Customer2| item1|  x7| 1| 
|Customer1| item1|  x6| 1| 
|Customer1| item1|  x7| 1| 
|Customer2| item1|  x3| 1| 
|Customer2| item3|  x5| 1| 
|Customer1| item2|  x5| 2| 
|Customer1| item2|  x4| 2| 
|Customer1| item2|  x1| 2| 
|Customer1| item3|  x5| 1| 
|Customer1| item1|  x3| 3| 
|Customer2| item1|  x1| 1| 
+---------+-------+---------+-----+ 

如果您需要將所有組合到矢量的Array/Seq表示中,則可以通過進一步聚合數據來完成此操作。希望這可以幫助!

+0

也有值的時間和URL不存在,在這種情況下,arr(2)和arr(3)將失敗,ArrayIndexOutOfBoundsException。是否有可能過濾4列的行。例如line.split(「\\ |」))。filter(l => l.length == 4) 我可以忽略沒有url和time的數據。 –

+0

只需從元組中刪除這些列,然後如果他們不需要。或者,'import scala.util.Try',然後將這些行更新爲 'val time = Try(Some(arr(2)))。getOrElse(None)'和 'val url = Try(Some(arr )))。getOrElse(None)' –

+0

取決於您是否需要這些行中的值。如果你不這樣做,那麼你可以按照你的建議進行過濾。如果你這樣做,然後看到以前的評論:) –