2017-06-05 91 views
1

我有如下記錄。如果第三個屬性爲All,我想將單個記錄轉換爲值分別爲EXTERNALINTERNAL的兩個記錄。如何根據列生成多個記錄?

輸入數據集:

Surender,cts,INTERNAL 
Raja,cts,EXTERNAL 
Ajay,tcs,All 

預期輸出:

Surender,cts,INTERNAL 
Raja,cts,EXTERNAL 
Ajay,tcs,INTERNAL 
Ajay,tcs,EXTERNAL 

我的星火代碼:

case class Customer(name:String,organisation:String,campaign_type:String) 

val custRDD = sc.textFile("/user/cloudera/input_files/customer.txt") 

val mapRDD = custRDD.map(record => record.split(",")) 
    .map(arr => (arr(0),arr(1),arr(2)) 
    .map(tuple => { 
     val name   = tuple._1.trim 
     val organisation = tuple._2.trim 
     val campaign_type = tuple._3.trim.toUpperCase 
     Customer(name, organisation, campaign_type) 
    }) 

mapRDD.toDF().registerTempTable("customer_processed") 

sqlContext.sql("SELECT * FROM customer_processed").show 

可能有人幫我解決這個問題?

回答

3

因爲它是斯卡拉...

如果你想寫一個更地道Scala代碼(也許還有一些交易表現,由於缺乏優化有更地道的代碼),你可以使用flatMap運營商(除去隱式參數):

flatMap [U](FUNC:(T)⇒TraversableOnce [U]):數據集[U]通過首先將函數應用於該數據集的所有元素,則返回一個新的數據集,和然後展平結果。

注:flatMap相當於explode功能,但你沒有註冊一個UDF(如在其他的答案)。

一個解決辦法如下:

// I don't care about the names of the columns since we use Scala 
// as you did when you tried to write the code 
scala> input.show 
+--------+---+--------+ 
|  _c0|_c1|  _c2| 
+--------+---+--------+ 
|Surender|cts|INTERNAL| 
| Raja|cts|EXTERNAL| 
| Ajay|tcs|  All| 
+--------+---+--------+ 

val result = input. 
    as[(String, String, String)]. 
    flatMap { case r @ (name, org, campaign) => 
    if ("all".equalsIgnoreCase(campaign)) { 
     Seq("INTERNAL", "EXTERNAL").map { cname => 
     (name, org, cname) 
     } 
    } else Seq(r) 
    } 
scala> result.show 
+--------+---+--------+ 
|  _1| _2|  _3| 
+--------+---+--------+ 
|Surender|cts|INTERNAL| 
| Raja|cts|EXTERNAL| 
| Ajay|tcs|INTERNAL| 
| Ajay|tcs|EXTERNAL| 
+--------+---+--------+ 

兩個相比較,查詢的性能,即基於flatMap VS explode基礎的問題,我覺得explode基可以更快略和優化的更好因爲有些代碼是在Spark的控制之下的(在邏輯運算符被映射到物理couterparts之前使用邏輯運算符)。在flatMap中,整個優化是您作爲Scala開發人員的責任。

下面的紅色圍成的區域對應於flatMap基於代碼和警告標誌是非常昂貴的成本和DeserializeToObject運營商SerializeFromObject

enter image description here

有趣的是每次查詢及其持續時間星火作業的數量。看起來基於explode的查詢需要2個Spark作業和200 ms,而基於flatMap的查詢僅需要1個Spark作業和43 ms。

enter image description here

這讓我吃驚了很多,並建議flatMap基於查詢可能更快(!)

+0

儘可能多我喜歡你已經把答案的方式,我不同意你的基準方法。您將比較flatMap與3行的爆炸作爲輸入,並得出結論:flatMap速度更快,這裏是我不同意的地方。 – eliasah

+0

正確。我同意你的觀點,數據集的大小可能會導致一個錯誤的結論,但是......直到你證明我錯了,那是我們唯一的基準。 「信仰與結果」 - 讓自己選擇你最喜歡的。 –

+0

您是否認真對待3行數據集上的Spark代碼基準測試會告訴您有關性能的任何信息? – mtoto

3

您可以使用和udf改造包含字符串把它映射到活動類型的Seq然後explode的campaign_type柱:

val campaignType_ : (String => Seq[String]) = { 
    case s if s == "ALL" => Seq("EXTERNAL", "INTERNAL") 
    case s => Seq(s) 
} 

val campaignType = udf(campaignType_) 

val df = Seq(("Surender", "cts", "INTERNAL"), 
    ("Raja", "cts", "EXTERNAL"), 
    ("Ajay", "tcs", "ALL")) 
    .toDF("name", "organisation", "campaign_type") 

val step1 = df.withColumn("campaign_type", campaignType($"campaign_type")) 
step1.show 
// +--------+------------+--------------------+ 
// | name|organisation|  campaign_type| 
// +--------+------------+--------------------+ 
// |Surender|   cts|   [INTERNAL]| 
// | Raja|   cts|   [EXTERNAL]| 
// | Ajay|   tcs|[EXTERNAL, INTERNAL]| 
// +--------+------------+--------------------+ 

val step2 = step1.select($"name", $"organisation", explode($"campaign_type")) 
step2.show 
// +--------+------------+--------+ 
// | name|organisation|  col| 
// +--------+------------+--------+ 
// |Surender|   cts|INTERNAL| 
// | Raja|   cts|EXTERNAL| 
// | Ajay|   tcs|EXTERNAL| 
// | Ajay|   tcs|INTERNAL| 
// +--------+------------+--------+ 

編輯:

你不實際需要一個udf,你可以使用when()。否則謂詞代替step1,如下所示:

val step1 = df.withColumn("campaign_type", 
when(col("campaign_type") === "ALL", array("EXTERNAL", "INTERNAL")).otherwise(array(col("campaign_type")))