2016-10-14 452 views
1

我將用Pandas編寫的一些代碼轉換爲PySpark。該代碼有很多for循環來創建可變數量的列,具體取決於用戶指定的輸入。更有效的方式來循環通過PySpark DataFrame並創建新列

我使用星火1.6.x版,用下面的示例代碼:

from pyspark.sql import SQLContext 
from pyspark.sql import functions as F 
import pandas as pd 
import numpy as np 

# create a Pandas DataFrame, then convert to Spark DataFrame 
test = sqlContext.createDataFrame(pd.DataFrame({'val1': np.arange(1,11)})) 

這給我留下了

+----+ 
|val1| 
+----+ 
| 1| 
| 2| 
| 3| 
| 4| 
| 5| 
| 6| 
| 7| 
| 8| 
| 9| 
| 10| 
+----+ 

我圈了很多的代碼,例如以下:

for i in np.arange(2,6).tolist(): 
    test = test.withColumn('val_' + str(i), F.lit(i ** 2) + test.val1) 

導致:

+----+-----+-----+-----+-----+ 
|val1|val_2|val_3|val_4|val_5| 
+----+-----+-----+-----+-----+ 
| 1| 5| 10| 17| 26| 
| 2| 6| 11| 18| 27| 
| 3| 7| 12| 19| 28| 
| 4| 8| 13| 20| 29| 
| 5| 9| 14| 21| 30| 
| 6| 10| 15| 22| 31| 
| 7| 11| 16| 23| 32| 
| 8| 12| 17| 24| 33| 
| 9| 13| 18| 25| 34| 
| 10| 14| 19| 26| 35| 
+----+-----+-----+-----+-----+ 

**問題:**如何重寫上述循環以提高效率?

我注意到,我的代碼運行速度較慢,因爲Spark在每組循環上花費了大量時間(即使在像2GB文本輸入這樣的小數據集上)。

感謝

回答

1

重複調用JVM方法的開銷很小,但對於單獨的循環來說應該不成問題。您可以通過使用單個選擇略微改善它:

df = spark.range(1, 11).toDF("val1") 

def make_col(i): 
    return (F.pow(F.lit(i), 2) + F.col("val1")).alias("val_{0}".format(i)) 

spark.range(1, 11).toDF("val1").select("*", *(make_col(i) for i in range(2, 6))) 

我也會避免使用NumPy類型。與普通Python對象相比,初始化NumPy對象通常更昂貴,而Spark SQL不支持NumPy類型,因此需要進行一些額外的轉換。

+0

謝謝,這個工程。然後我會考慮如何將上面的代碼應用於我的代碼。我使用Spark 1.6.x,所以在運行代碼時出現錯誤,主要是'* .toDF(「val1」)'抱怨期待模式類型。應該很容易修復,因爲它在Spark 2.0.x上運行良好 –

-1

一個withColumn將在整個RDD工作。因此,對於要添加的每個列使用該方法通常不是一個好習慣。有一種方法可以處理地圖函數中的列及其數據。由於一個映射函數在這裏完成這項工作,所以添加新列及其數據的代碼將並行完成。

a。您可以根據計算收集新值。

b。添加這些新的列值到主RDD如下

val newColumns: Seq[Any] = Seq(newcol1,newcol2) 
Row.fromSeq(row.toSeq.init ++ newColumns) 

這裏行,是行的地圖方法

c中的參考。如下創建新模式

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

d。添加到舊模式

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e。用新的列創建新的數據幀

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
+1

謝謝,但斯卡拉使它有點難以遵循。我在使用整個DataFrame上的Column的時候明白你的意思。我無法將自己的頭圍繞在如何使用map來使其工作。 –

+0

當您使用地圖時,您正在對每一行執行操作。因此,您要做的是爲每一行創建新列的新模式,爲這些列準備數據,然後將上述新模式添加到舊模式(可從數據框中獲取),然後使用新列創建新數據框。你可以考慮在Python中的上述步驟,如果你正在尋找它 – Ramzy

相關問題