2017-08-03 80 views
3
case class Foo1(codes:Seq[String], name:String) 
case class Foo2(code:String, description:String) 

val ds1: Dataset[Foo1] = sqlContext.sparkContext.parallelize(Seq(
    Foo1(Seq("A"),   "foo1"), 
    Foo1(Seq("A", "B"),  "foo2"), 
    Foo1(Seq("B", "C", "D"), "foo3"), 
    Foo1(Seq("C"),   "foo4"), 
    Foo1(Seq("C", "D"),  "foo5") 
)).toDS 

val ds2: Dataset[Foo2] = sqlContext.sparkContext.parallelize(Seq(
    Foo2("A", "product A"), 
    Foo2("B", "product B"), 
    Foo2("C", "product C"), 
    Foo2("D", "product D"), 
    Foo2("E", "product E") 
)).toDS 

val j = ds1.join(ds2, ds2.col("code").isin(ds1.col("codes"))) 

希望這Scala代碼片段清楚什麼,我試圖完成加入星火Dataframes,我們的數據是結構化的,這樣一個數據集具有包含值的數組一列,我希望將該集合中的值加入另一個數據集。因此,例如Seq("A", "B")ds1將加入"A""B"ds2與「ISIN」運營商

這個例子是在斯卡拉,但我相信一個類似的例子將適用於Java或Python與其他集合類。

上欄的「ISIN」運營商似乎正是我想要的,而這種構建並運行,但是當我運行它,我得到以下錯誤:

org.apache.spark.sql.AnalysisException: cannot resolve '(`code` IN (`codes`))' due to data type mismatch: Arguments must be same type;; 

進一步閱讀我看到isin()希望採取可變參數(「splatted args」),似乎更適合於filter()。所以我的問題是,這是這個運算符的預期用途,還是有其他方式來執行這種類型的連接?

+2

有重複的例子很好地格式化的問題。這些日子你看不到很多。 – philantrovert

回答

3

請使用array_contains

ds1.crossJoin(ds2).where("array_contains(codes, code)").show 

+---------+----+----+-----------+ 
| codes|name|code|description| 
+---------+----+----+-----------+ 
|  [A]|foo1| A| product A| 
| [A, B]|foo2| A| product A| 
| [A, B]|foo2| B| product B| 
|[B, C, D]|foo3| B| product B| 
|[B, C, D]|foo3| C| product C| 
|[B, C, D]|foo3| D| product D| 
|  [C]|foo4| C| product C| 
| [C, D]|foo5| C| product C| 
| [C, D]|foo5| D| product D| 
+---------+----+----+-----------+ 

如果使用星火x或2.0替換crossJoin標準加盟,並啓用交叉連接配置中,如果需要的話。

它可能是由可避免的笛卡爾乘積與explode

ds1.withColumn("code", explode($"codes")).join(ds2, Seq("code")).show 
+----+---------+----+-----------+            
|code| codes|name|description| 
+----+---------+----+-----------+ 
| B| [A, B]|foo2| product B| 
| B|[B, C, D]|foo3| product B| 
| D|[B, C, D]|foo3| product D| 
| D| [C, D]|foo5| product D| 
| C|[B, C, D]|foo3| product C| 
| C|  [C]|foo4| product C| 
| C| [C, D]|foo5| product C| 
| A|  [A]|foo1| product A| 
| A| [A, B]|foo2| product A| 
+----+---------+----+-----------+ 
+1

這正是我想要的,謝謝。在5分鐘內解決了沖刷文件的時間。 –