2017-07-29 89 views
3

有關於這個問題在這裏一個問題:爆炸火花SQL表中的多個列

Explode (transpose?) multiple columns in Spark SQL table

假設我們有如下額外列:

**userId someString  varA  varB  varC varD** 
    1  "example1" [0,2,5] [1,2,9] [a,b,c] [red,green,yellow] 
    2  "example2" [1,20,5] [9,null,6] [d,e,f] [white,black,cyan] 

締結輸出像下面:

userId someString  varA  varB varC  varD 
    1  "example1"  0   1  a  red 
    1  "example1"  2   2  b  green 
    1  "example1"  5   9  c  yellow 
    2  "example2"  1   9  d  white 
    2  "example2"  20  null e  black 
    2  "example2"  5   6  f  Cyan 

答案是通過定義udf as:

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) 

並定義「withColumn」。

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
    $"userId", $"someString", 
    $"vars._1".alias("varA"), $"vars._2".alias("varB")).show 

如果我們需要擴展上面的回答,更多的列,什麼是修改上面的代碼最簡單的方法。請任何幫助。

回答

3

udf的zip似乎可以,但如果需要更多集合,則需要擴展。不幸的是,沒有真正好的方法來壓縮4 Seqs,但這應該工作:

def assertSameSize(arrs:Seq[_]*) = { 
assert(arrs.map(_.size).distinct.size==1,"sizes differ") 
} 

val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => { 
    assertSameSize(xa,xb,xc,xd) 
    xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i))) 
    } 
) 
4

我假設varA,varB,varC,varD的大小與您的示例保持不變。

scala> case class Input(user_id : Integer,someString : String, varA : Array[Integer],varB : Array[Integer],varC : Array[String], varD : Array[String]) 
defined class Input 

scala> case class Result(user_id : Integer,someString : String , varA : Integer,varB : Integer,varC : String, varD : String) 
defined class Result 

scala> val obj1 = Input(1,"example1",Array(0,2,5),Array(1,2,9),Array("a","b","c"),Array("red","green","yellow")) 
obj1: Input = Input(1,example1,[Ljava.lang.Integer;@77c43ec2,[Ljava.lang.Integer;@3a332d08,[Ljava.lang.String;@5c1222da,[Ljava.lang.String;@114e051a) 

scala> val obj2 = Input(2,"example2",Array(1,20,5),Array(9,null,6),Array("d","e","f"),Array("white","black","cyan")) 
obj2: Input = Input(2,example2,[Ljava.lang.Integer;@326db38,[Ljava.lang.Integer;@50914458,[Ljava.lang.String;@339b73ae,[Ljava.lang.String;@1567ee0a) 

scala> val input_df = sc.parallelize(Seq(obj1,obj2)).toDS 
input_df: org.apache.spark.sql.Dataset[Input] = [user_id: int, someString: string ... 4 more fields] 

scala> input_df.show 
+-------+----------+----------+------------+---------+--------------------+ 
|user_id|someString|  varA|  varB|  varC|    varD| 
+-------+----------+----------+------------+---------+--------------------+ 
|  1| example1| [0, 2, 5]| [1, 2, 9]|[a, b, c]|[red, green, yellow]| 
|  2| example2|[1, 20, 5]|[9, null, 6]|[d, e, f]|[white, black, cyan]| 
+-------+----------+----------+------------+---------+--------------------+ 

scala> def getResult(row : Input) : Iterable[Result] = { 
    |    val user_id = row.user_id 
    |    val someString = row.someString 
    |    val varA = row.varA 
    |    val varB = row.varB 
    |    val varC = row.varC 
    |    val varD = row.varD 
    |    val seq = for(i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i),varC(i),varD(i))} 
    |    seq.toSeq 
    |   } 
getResult: (row: Input)Iterable[Result] 

scala> val resdf = input_df.flatMap{row => getResult(row)} 
resdf: org.apache.spark.sql.Dataset[Result] = [user_id: int, someString: string ... 4 more fields] 

scala> resdf.show 
+-------+----------+----+----+----+------+ 
|user_id|someString|varA|varB|varC| varD| 
+-------+----------+----+----+----+------+ 
|  1| example1| 0| 1| a| red| 
|  1| example1| 2| 2| b| green| 
|  1| example1| 5| 9| c|yellow| 
|  2| example2| 1| 9| d| white| 
|  2| example2| 20|null| e| black| 
|  2| example2| 5| 6| f| cyan| 
+-------+----------+----+----+----+------+ 

如果varA,varB,varC或varD列的大小不同,那麼這些場景需要是句柄。

您可以遍歷的最大大小並輸出空值,如果通過處理異常值不存在任何列中的值。

+0

謝謝安庫什,如果數據是從外部來源如hdfs讀取怎麼辦。我們如何填充'case class Input(user_id:Integer,someString:String,varA:Array [Integer],varB:Array [Integer],varC:Array [String],varD:Array [String] )'。 –

+0

這成爲一個單獨的問題。你加載了什麼樣的數據? (csv/json)是什麼格式?您需要重新格式化數據。 –