2016-09-20 114 views
2

我在使用python上的Spark Data Frames在兩個數據框上進行連接時遇到了一些麻煩。我有兩個數據框,我不得不改變列的名稱,以使它們對於每個數據框都是唯一的,所以後來我可以告訴哪一列是哪一列。我這樣做是爲了重命名列(firstDf和secondDf是使用功能createDataFrame創建星火DataFrames):我重複了這個第二個數據幀Pyspark DataFrame - 如何使用變量進行連接?

oldColumns = firstDf.schema.names 
newColumns = list(map(lambda x: "{}.{}".format('firstDf', x), oldColumns)) 
firstDf = firstDf.toDF(*newColumns) 

。然後我試圖加入他們的行列,使用下面的代碼:

from pyspark.sql.functions import * 

firstColumn = 'firstDf.firstColumn' 
secondColumn = 'secondDf.firstColumn' 
joinedDF = firstDf.join(secondDf, col(firstColumn) == col(secondColumn), 'inner') 

使用它,這樣我得到以下錯誤:

AnalysisException "cannot resolve 'firstDf.firstColumn' given input columns: [firstDf.firstColumn, ...];"

這僅是爲了說明該列輸入列陣列中存在。

如果我不重命名DataFrames列我可以在使用這段代碼加入其中:

joinedDf = firstDf.join(secondDf, firstDf.firstColumn == secondDf.firstColumn, 'inner') 

但是,這給我一份有曖昧列名的數據幀。

關於如何解決這個問題的任何想法?

回答

0

一般來說,不要在名稱中使用點。這些都有特殊含義(可用於確定表格或訪問struct字段),並需要一些額外的工作才能被正確識別。

對於相等連接所有你需要的是一個列名:

from pyspark.sql.functions import col 

firstDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn")) 
secondDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn")) 

column = 'firstColumn' 
firstDf.join(secondDf, [column], 'inner') 

## DataFrame[firstColumn: bigint, secondColumn: string, secondColumn: string] 

對於複雜的情況下,使用表別名:

直接
firstColumn = 'firstDf.firstColumn' 
secondColumn = 'secondDf.firstColumn' 

firstDf.alias("firstDf").join(
    secondDf.alias("secondDf"), 
    # After alias prefix resolves to table name 
    col(firstColumn) == col(secondColumn), 
    "inner" 
) 

## DataFrame[firstColumn: bigint, secondColumn: string, firstColumn: bigint, secondColumn: string] 

您還可以使用父幀:

column = 'firstColumn' 

firstDf.join(secondDf, firstDf[column] == secondDf[column]) 
+0

感謝您的回覆,特別是關於不使用名稱中的點的提示。第一種方法是有效的,但我需要連接的DataFrame爲兩個連接的DataFrame的每一列都有唯一的列名。儘管如此,按照建議使用表別名給出了我在問題中顯示的同樣的AnalysisException錯誤。 –

+0

它應該工作得很好。我添加了一個完全可重複的例子的表格定義。 – zero323

+0

對不起,我只是意識到改變點使它工作。再次感謝您的回覆! –