2017-08-16 106 views
-2

樣品輸入從列創建一個基於地圖類型列的值:星火:要使用Dataframes

Item_Id Item_Name Buyer's_Id Buyers_Name 
0001 Keyboard 10000  ABC 
0002 Monitor 10010  XYZ 
0001 Keyboard 10005  DXC 

樣品中間輸出:

0001,Keyboard,{"Buyer's_Id":"10000","Buyers_Name":"ABC"}  
0002,Monitor,{"Buyer's_Id":"10010","Buyers_Name":"XYZ"}  
0001,Keyboard,{"Buyer's_Id":"10005","Buyers_Name":"DXC"}  

最終輸出:

0001,Keyboard,[{"Buyer's_Id":"10000","Buyers_Name":"Abc"},{"Buyer's_Id":"10005","Buyers_Name":"DXC"}] 
0002,Monitor,[{"Buyer's_Id":"10010","Buyers_Name":"XYZ"}]  

回答

1

你想要什麼實現可以用

map處理每一行

mapPartitions處理每個分區

scala> input_df.show 
+-------+---------+----------+-----------+ 
|Item_Id|Item_Name|Buyer's_Id|Buyers_Name| 
+-------+---------+----------+-----------+ 
|  1| Keyboard|  10000|  ABC| 
|  2| Monitor|  10010|  XYZ| 
|  1| Keyboard|  10005|  DXC| 
+-------+---------+----------+-----------+ 

import org.apache.spark.sql.Row 
import org.apache.spark.sql.catalyst.encoders.RowEncoder 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.collect_set 

由於您的中間數據幀具有不同的架構。所以我們需要定義新的模式

scala> val schema = StructType(Seq(
    |  StructField("item_number", IntegerType), 
    |  StructField("item_name", StringType), 
    |  StructField("json_string", StringType) 
    | )) 
scala> val encoder = RowEncoder(schema) 
scala> val intermediate_df = input_df.map{row => 
    |  val itm_nbr = row.getAs[Integer]("Item_Id") 
    |  val itm_nme = row.getAs[String]("Item_Name") 
    |  val byer_id = row.getAs[Integer]("Buyer's_Id") 
    |  val byer_nme = row.getAs[String]("Buyers_Name") 
    |  val req_string = s"""{"Buyer's_id" : $byer_id,"Buyers_Name" : $byer_nme}""" 
    |  Row(itm_nbr,itm_nme,req_string) 
    | }(encoder) 
intermediate_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [item_number: int, item_name: string ... 1 more field] 

scala> intermediate_df.show(false) 
+-----------+---------+-------------------------------------------+ 
|item_number|item_name|json_string        | 
+-----------+---------+-------------------------------------------+ 
|1   |Keyboard |{"Buyer's_id" : 10000,"Buyers_Name" : ABC}| 
|2   |Monitor |{"Buyer's_id" : 10010,"Buyers_Name" : XYZ}| 
|1   |Keyboard |{"Buyer's_id" : 10005,"Buyers_Name" : DXC}| 
+-----------+---------+-------------------------------------------+ 

scala> val result_df = intermediate_df.groupBy('item_number,'item_name).agg(collect_set('json_string).as("json_list")).orderBy('item_number) 
result_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [item_number: int, item_name: string ... 1 more field] 

scala> result_df.show(false) 
+-----------+---------+------------------------------------------------------------------------------------------+ 
|item_number|item_name|json_list                     | 
+-----------+---------+------------------------------------------------------------------------------------------+ 
|1   |Keyboard |[{"Buyer's_id" : 10000,"Buyers_Name" : ABC}, {"Buyer's_id" : 10005,"Buyers_Name" : DXC}]| 
|2   |Monitor |[{"Buyer's_id" : 10010,"Buyers_Name" : XYZ}]            | 
+-----------+---------+------------------------------------------------------------------------------------------+ 

希望這對我有幫助!

+0

謝謝。它有很大的幫助 –

+0

如果這個答案對你有幫助。你可以接受答案,並upvote。謝謝 –