2016-09-22 78 views
0

您好我有2個dataframes加入蟒蛇+ pyspark:上內差錯多列比較捧場pyspark

#df1 
name genre count 
satya drama 1 
satya action 3 
abc  drame 2 
abc  comedy 2 
def  romance 1 

#df2 
name max_count 
satya 3 
abc 2 
def 1 

現在我想加入上述2個DFS的名稱和數量== MAX_COUNT,但我收到錯誤

import pyspark.sql.functions as F 
from pyspark.sql.functions import count, col 
from pyspark.sql.functions import struct 
df = spark.read.csv('file',sep = '###', header=True) 
df1 = df.groupBy("name", "genre").count() 
df2 = df1.groupby('name').agg(F.max("count").alias("max_count")) 
#Now trying to join both dataframes 
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count)) 
final_df.show() ###Error 
#py4j.protocol.Py4JJavaError: An error occurred while calling o207.showString. 
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) 
#Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: count(1) 
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224) 

但成功與「左」加入

final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count), "left") 
final_df.show() ###Success but i don't want left join , i want inner join 

我的問題是,爲什麼上面的一個失敗,我我在那裏做錯了什麼?

我把這個鏈接「Find maximum row per group in Spark DataFrame」。使用第一個答案(2 groupby方法)。但是同樣的錯誤。

我在spark-2.0.0-bin-hadoop2.7和python 2.7上。

請建議。謝謝。

編輯:

上述場景可與火花1.6(這是相當奇怪,這有什麼錯火花2.0(或與我安裝,我將重新安裝,檢查和更新這裏))。

有沒有人在spark 2.0上試過這個,通過下面的Yaron的回答得到了成功?

+0

只是一個猜測.....列名與數據框方法衝突?例如。 'count'。不知道爲什麼只會影響內連接。你可以嘗試將'count'重命名爲'cnt',或者只是爲了排除這種可能性。 – RedBaron

+0

@ RedBaron-Alredy嘗試了這個.Same錯誤。 – Satya

回答

2

更新:由於使用「count」作爲列名,您的代碼似乎也失敗了。在DataFrame API中count似乎是受保護關鍵字 。 重命名計數爲「mycount」解決了問題。下面的工作代碼被修改爲支持我用來測試您的問題的Spark版本1.5.2。

df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/tmp/fac_cal.csv") 
df1 = df.groupBy("name", "genre").count() 
df1 = df1.select(col("name"),col("genre"),col("count").alias("mycount")) 
df2 = df1.groupby('name').agg(F.max("mycount").alias("max_count")) 
df2 = df2.select(col('name').alias('name2'),col("max_count")) 
#Now trying to join both dataframes 
final_df = df1.join(df2,[df1.name == df2.name2 , df1.mycount == df2.max_count]) 
final_df.show() 

+-----+---------+-------+-----+---------+ 
| name| genre|mycount|name2|max_count| 
+-----+---------+-------+-----+---------+ 
|brata| comedy|  2|brata|  2| 
|brata| drama|  2|brata|  2| 
|panda|adventure|  1|panda|  1| 
|panda| romance|  1|panda|  1| 
|satya| action|  3|satya|  3| 
+-----+---------+-------+-----+---------+ 

https://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html

cond = [df.name == df3.name, df.age == df3.age] 
>>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect() 
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] 

複雜條件下的例子,你可以嘗試:

final_df = df1.join(df2, [df1.name == df2.name , df1.mycount == df2.max_count]) 

還要注意的是,根據該規範 「左」 不是的一部分有效的連接類型: how-str,默認'inner'。內部,外部,left_outer,right_outer,leftsemi之一。

+0

嗨亞倫 - 它仍然沒有工作。 '左'在我的系統上正常工作。我曾嘗試在dfs中重命名列爲(df1.columns == name,genre,cnt),(df2.columns == name,cnt)並嘗試f = df1.join(df2,['name','cnt' ])#got error和f = df1.join(df2,['name','cnt'],'left')##成功...... – Satya

+0

在第一個例子中,你是否嘗試了內部連接?如果它與你的系統一起工作,那麼我的火花版本會有些腥意。 – Satya

+0

@Satya可以請你與我們分享輸入文件,它重現你看到的問題? (例如您在spark.read.csv('file'...)中讀取的'文件') – Yaron

0

我的工作,圍繞各自的DFS火花2.0

我創建了一個單獨的列(「合併」)在列加入對比(「名」,「mycount的」),所以現在我有一列比較,這不是創建任何問題,因爲我只比較一列。

def combine_func(*args): 
    data = '_'.join([str(x) for x in args]) ###converting nonstring to str tehn concatenation 
    return data 
combine_func = udf(combine_func, StringType()) ##register the func as udf 
df1 = df1.withColumn('combined_new_1', combine_new(df1['name'],df1['mycount'])) ###a col having concatenated value from name and mycount columns eg: 'satya_3' 
df2 = df2.withColumn('combined_new_2', combine_new(df2['name2'],df2['max_count'])) 
#df1.columns == 'name','genre', 'mycount', 'combined_new_1' 
#df2.columns == 'name2', 'max_count', 'combined_new_2' 
#Now join 
final_df = df1.join(df2,df1.combined_new_1 == df2.combined_new_2, 'inner') 
#final_df = df1.join(df2,df1.combined_new_1 == df2.combined_new_2, 'inner').select('the columns you want') 
final_df.show() ####It is showing the result, Trust me. 

請不要跟隨,直到除非你是在趕時間,對於一個可靠的解決方案更好的搜索。

2

當我試圖加入兩個DataFrames,其中一個是GroupedData時,我遇到了同樣的問題。它在我內部聯接之前緩存GroupedData DataFrame時適用於我。爲您的代碼,請嘗試:

df1 = df.groupBy("name", "genre").count().cache() # added cache() 
df2 = df1.groupby('name').agg(F.max("count").alias("max_count")).cache() # added cache() 
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count)) # no change 
+0

@約翰 - 是的,但它無法理解爲什麼。你可以請你解釋一下,爲什麼它的工作以及爲什麼Not-cached版本不是。 – Satya

+1

@Satya我的理解是,給定Spark的懶惰評估機制,如果我們在加入df1和df2之前不緩存df1和df2,Spark將會在發出join命令時動態地分別創建df1和df2。鑑於錯誤代碼「無法評估表達式:count(1)」,似乎Spark停留在找到多次計數值的循環中。 – Johann