case class Foo1(codes:Seq[String], name:String)
case class Foo2(code:String, description:String)
val ds1: Dataset[Foo1] = sqlContext.sparkContext.parallelize(Seq(
Foo1(Seq("A"), "foo1"),
Foo1(Seq("A", "B"), "foo2"),
Foo1(Seq("B", "C", "D"), "foo3"),
Foo1(Seq("C"), "foo4"),
Foo1(Seq("C", "D"), "foo5")
)).toDS
val ds2: Dataset[Foo2] = sqlContext.sparkContext.parallelize(Seq(
Foo2("A", "product A"),
Foo2("B", "product B"),
Foo2("C", "product C"),
Foo2("D", "product D"),
Foo2("E", "product E")
)).toDS
val j = ds1.join(ds2, ds2.col("code").isin(ds1.col("codes")))
希望這Scala代碼片段清楚什麼,我試圖完成加入星火Dataframes,我們的數據是結構化的,這樣一個數據集具有包含值的數組一列,我希望將該集合中的值加入另一個數據集。因此,例如Seq("A", "B")
在ds1
將加入"A"
和"B"
在ds2
。與「ISIN」運營商
這個例子是在斯卡拉,但我相信一個類似的例子將適用於Java或Python與其他集合類。
上欄的「ISIN」運營商似乎正是我想要的,而這種構建並運行,但是當我運行它,我得到以下錯誤:
org.apache.spark.sql.AnalysisException: cannot resolve '(`code` IN (`codes`))' due to data type mismatch: Arguments must be same type;;
進一步閱讀我看到isin()
希望採取可變參數(「splatted args」),似乎更適合於filter()
。所以我的問題是,這是這個運算符的預期用途,還是有其他方式來執行這種類型的連接?
有重複的例子很好地格式化的問題。這些日子你看不到很多。 – philantrovert