2017-03-17 146 views
2

在斯卡拉星火添加一列,我可以輕鬆地添加一列到現有的數據框寫入從另一個數據幀

val newDf = df.withColumn("date_min", anotherDf("date_min"))

AnalysisException在PySpark結果這樣做。

下面是我在做什麼:

minDf.show(5) 
maxDf.show(5) 
+--------------------+ 
|   date_min| 
+--------------------+ 
|2016-11-01 10:50:...| 
|2016-11-01 11:46:...| 
|2016-11-01 19:23:...| 
|2016-11-01 17:01:...| 
|2016-11-01 09:00:...| 
+--------------------+ 
only showing top 5 rows 

+--------------------+ 
|   date_max| 
+--------------------+ 
|2016-11-01 10:50:...| 
|2016-11-01 11:46:...| 
|2016-11-01 19:23:...| 
|2016-11-01 17:01:...| 
|2016-11-01 09:00:...| 
+--------------------+ 
only showing top 5 rows 

,然後什麼導致一個錯誤:

newDf = minDf.withColumn("date_max", maxDf["date_max"]) 

AnalysisExceptionTraceback (most recent call last) 
<ipython-input-13-7e19c841fa51> in <module>() 
     2 maxDf.show(5) 
     3 
----> 4 newDf = minDf.withColumn("date_max", maxDf["date_max"]) 

/opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col) 
    1491   """ 
    1492   assert isinstance(col, Column), "col should be Column" 
-> 1493   return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx) 
    1494 
    1495  @ignore_unicode_prefix 

/opt/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, self.name) 
    1134 
    1135   for temp_arg in temp_args: 

/opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    67            e.java_exception.getStackTrace())) 
    68    if s.startswith('org.apache.spark.sql.AnalysisException: '): 
---> 69     raise AnalysisException(s.split(': ', 1)[1], stackTrace) 
    70    if s.startswith('org.apache.spark.sql.catalyst.analysis'): 
    71     raise AnalysisException(s.split(': ', 1)[1], stackTrace) 

AnalysisException: u'resolved attribute(s) date_max#67 missing from date_min#66 in operator !Project [date_min#66, date_max#67 AS date_max#106];;\n!Project [date_min#66, date_max#67 AS date_max#106]\n+- Project [date_min#66]\n +- Project [cast((cast(date_min#6L as double)/cast(1000 as double)) as timestamp) AS date_min#66, cast((cast(date_max#7L as double)/cast(1000 as double)) as timestamp) AS date_max#67]\n  +- SubqueryAlias df, `df`\n   +- LogicalRDD [idvisiteur#5, date_min#6L, date_max#7L, sales_sum#8, sales_count#9L]\n' 
+0

你可以嘗試加入,如果有DF相同長度 –

回答

1

簡短的回答是,這是不是由星火據幀API的支持,至少不在Spark 2.x中。但是,您可以編寫一個輔助函數來實現類似的功能。

首先,讓我們創建一些測試數據:

minDf = sc.parallelize(['2016-11-01','2016-11-02','2016-11-03']).map(lambda x: (x,)).toDF(['date_min']) 
maxDf = sc.parallelize(['2016-12-01','2016-12-02','2016-12-03']).map(lambda x: (x,)).toDF(['date_max']) 

然後可以使用zip合併的兩條數據幀條件是dataframes分區佈局完全相同:

from pyspark.sql.types import StructType 

def zip_df(l, r): 
    return l.rdd.zip(r.rdd).map(lambda x: (x[0][0],x[1][0])).toDF(StructType([l.schema[0],r.schema[0]])) 

combined = zip_df(minDf, maxDf.select('date_max')) 
combined.show() 
+2

備案 - 這不是在星火支持,不PySpark尤其如此。 – zero323

2

希望這有助於!

import pyspark.sql.functions as f 

minDf = sc.parallelize([['2016-11-01 10:50:00'],['2016-11-01 11:46:00']]).toDF(["date_min"]) 
maxDf = sc.parallelize([['2016-11-01 10:50:00'],['2016-11-01 11:46:00']]).toDF(["date_max"]) 

# since there is no common column between these two dataframes add row_index so that it can be joined 
minDf=minDf.withColumn('row_index', f.monotonically_increasing_id()) 
maxDf=maxDf.withColumn('row_index', f.monotonically_increasing_id()) 

minDf = minDf.join(maxDf, on=["row_index"]).sort("row_index").drop("row_index") 
minDf.show() 

輸出是:

+-------------------+-------------------+ 
|   date_min|   date_max| 
+-------------------+-------------------+ 
|2016-11-01 10:50:00|2016-11-01 10:50:00| 
|2016-11-01 11:46:00|2016-11-01 11:46:00| 
+-------------------+-------------------+ 
相關問題