2017-12-27 333 views
0

我在Spark(v2.1.1)中有一個包含分層數據的3列(如下所示)的數據集。Apache Spark中的分層數據處理

  • 我的目標的目標是增量編號分配給基礎上,父子層次的每一行。從圖形上可以說,分層數據是一個樹的集合。
  • 根據下表,我已經有基於'Global_ID'分組的行。現在我想以 的增量順序生成'Value'列,但是基於 'Parent'和'Child'列的數據層次結構。

表格表示(數值是所需的輸出):

+-----------+--------+-------+   +-----------+--------+-------+-------+ 
    |  Current Dataset  |   |  Desired Dataset (Output)  | 
    +-----------+--------+-------+   +-----------+--------+-------+-------+ 
    | Global_ID | Parent | Child |   | Global_ID | Parent | Child | Value | 
    +-----------+--------+-------+   +-----------+--------+-------+-------+ 
    |  111 | 111 | 123 |   |  111 | 111 | 111 |  1 | 
    |  111 | 135 | 246 |   |  111 | 111 | 123 |  2 | 
    |  111 | 123 | 456 |   |  111 | 123 | 789 |  3 | 
    |  111 | 123 | 789 |   |  111 | 123 | 456 |  4 | 
    |  111 | 111 | 111 |   |  111 | 111 | 135 |  5 | 
    |  111 | 135 | 468 |   |  111 | 135 | 246 |  6 | 
    |  111 | 135 | 268 |   |  111 | 135 | 468 |  7 | 
    |  111 | 268 | 321 |   |  111 | 135 | 268 |  8 | 
    |  111 | 138 | 139 |   |  111 | 268 | 321 |  9 | 
    |  111 | 111 | 135 |   |  111 | 111 | 138 | 10 | 
    |  111 | 111 | 138 |   |  111 | 138 | 139 | 11 | 
    |  222 | 222 | 654 |   |  222 | 222 | 222 | 12 | 
    |  222 | 654 | 721 |   |  222 | 222 | 987 | 13 | 
    |  222 | 222 | 222 |   |  222 | 222 | 654 | 14 | 
    |  222 | 721 | 127 |   |  222 | 654 | 721 | 15 | 
    |  222 | 222 | 987 |   |  222 | 721 | 127 | 16 | 
    |  333 | 333 | 398 |   |  333 | 333 | 333 | 17 | 
    |  333 | 333 | 498 |   |  333 | 333 | 398 | 18 | 
    |  333 | 333 | 333 |   |  333 | 333 | 498 | 19 | 
    |  333 | 333 | 598 |   |  333 | 333 | 598 | 20 | 
    +-----------+--------+-------+   +-----------+--------+-------+-------+ 

樹表示(期望值旁邊的每個節點表示):

     +-----+           +-----+ 
        1 | 111 |          17 | 333 | 
         +--+--+           +--+--+ 
         |             | 
     +---------------+--------+-----------------+   +----------+----------+ 
     |      |     |   |   |   | 
     +--v--+     +--v--+   +--v--+  +--v--+ +--v--+ +--v--+ 
    2 | 123 |    5 | 135 |  10 | 138 |  | 398 | | 498 | | 598 | 
     +--+--+     +--+--+   +--+--+  +--+--+ +--+--+ +--+--+ 
    +-----+-----+   +--------+--------+  |   18   19   20 
    |   |   |  |  |  | 
+--v--+  +--v--+ +--v--+ +--v--+ +--v--+ +--v--+ 
| 789 |  | 456 | | 246 | | 468 | | 268 | | 139 |     +-----+ 
+-----+  +-----+ +-----+ +-----+ +--+--+ +-----+    12 | 222 | 
    3   4   6  7  8 |  11     +--+--+ 
             +--v--+        | 
             | 321 |      +------+-------+ 
             +--+--+      |    | 
              9      +--v--+  +--v--+ 
                   13 | 987 | 14 | 654 | 
                    +--+--+  +--+--+ 
                        | 
                       +--v--+ 
                      15 | 721 | 
                       +--+--+ 
                        | 
                       +--v--+ 
                      16 | 127 | 
                       +--+--+ 

代碼段:

Dataset<Row> myDataset = spark 
       .sql("select Global_ID, Parent, Child from RECORDS"); 

JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID")) 
    .agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))), 
     functions.sort_array(functions.collect_list(new Column("Child").as("child_col")))) 
    .orderBy(new Column("Global_ID")) 
    .withColumn("vars", functions.explode(<Spark UDF>) 
    .select(new Column("vars"),new Column("parent_col"),new Column("child_col")) 
    .javaRDD().zipWithIndex(); 


// Sample UDF (TODO: Actual Implementation) 
spark.udf().register("computeValue", 
       (<Column Names>) -> <functionality & implementation>, 
       DataTypes.<xxx>); 

經過大量的調查研究,並通過博客,許多建議去,我曾嘗試下面的方法,但無濟於事,以實現我的方案的結果。

技術堆棧:

  • Apache的火花(V2.1。1)

  • 爪哇8

  • AWS EMR集羣(火花應用部署)


數據量:

  • 大約〜Dataset中

20000000點方法下的行嘗試:

  1. 星火GraphX + GraphFrames:

  2. 星火GraphX預凝膠API:


替代品的任何建議(或)在當前的方法修改將是很有益的,因爲我搞清楚這個用例的解決方案完全丟失。

感謝您的幫助!謝謝!

回答

0

注意:下面的解決方案是scala spark。您可以輕鬆轉換爲Java代碼。

檢查了這一點。我試着用Spark Sql來做這件事,你可以得到一個想法。基本上的想法是在對它們進行聚合和分組的同時對孩子,父母和全球身份進行排序。一旦按globalid進行分組和排序,則展開其餘部分。你會得到有序的結果表到以後你可以zipWithIndex添加等級(值)

import org.apache.spark.sql.SQLContext 
    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.expressions.UserDefinedFunction 
    import org.apache.spark.sql.functions.udf 

    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598)) 
    val ddd = sc.parallelize(t).toDF 
    val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys) 
    val dd1 = ddd 
    .groupBy($"_1") 
    .agg(sort_array(collect_list($"_2")).as("v"), 
     sort_array(collect_list($"_3")).as("w")) 
    .orderBy(asc("_1")) 
    .withColumn("vars", explode(zip($"v", $"w"))) 
    .select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex 

    dd1.collect 

輸出

res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2), 
([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9)) 
+0

這似乎是一個非常可行的解決方案。感謝您的代碼!我肯定會試試這個,但只是有點關注'收藏列表',因爲我的數據大約在2000萬行左右,但它應該是好的。如果您可以提供與Scala代碼完全相同的Java代碼,那就太好了,因爲我是Scala的新手。再次感謝! – Sridher

+0

@Sridher我認爲你可以很容易地將其轉換爲java代碼。這裏主要要注意的是你可以在java中複製的spark代碼。 –

+0

我將大部分代碼轉換爲與Java相當的代碼,但是面臨着您使用過的Spark UDF的一些問題。你能幫我解決UDF嗎?請參閱我編輯過的代碼片段。 – Sridher