2017-01-12 129 views
1

我知道PySpark DataFrames是不可變的,所以我想創建一個新的列,該列應用於PySpark DataFrame的現有列。我的數據太大而無法使用collect()。從PySpark DataFrame列中刪除元素

所討論的列是唯一整數的列表的列表(在給定的列表中沒有一個int的重複),例如:

[1] 
[1,2] 
[1,2,3] 
[2,3] 

以上是一個玩具例如,如我的實際數據幀具有列表最大長度爲52個獨特的整數。我想生成一個迭代整列表的列表並刪除每個循環的一個元素的列。要刪除的元素將來自所有列表中唯一元素的集合,在本例中爲[1,2,3]

因此,對於第一次迭代:

移除元件1,使得結果是:

[] 
[2] 
[2,3] 
[2,3] 

對於第二次迭代:

卸下元件2,使得結果是:

[1] 
[1] 
[1,3] 
[3] 

等等,並重覆上面的元素3.

對於每次迭代,我想將結果追加到原始PySpark DataFrame中以運行一些查詢,將此「過濾」列用作原始DataFrame的行篩選器。

我的問題是,如何將PySpark DataFrame的列轉換爲列表?我的數據集很大,所以df.select('columnofintlists').collect()會導致內存問題(例如:Kryo serialization failed: Buffer overflow. Available: 0, required: 1448662. To avoid this, increase spark.kryoserializer.buffer.max value.)。

回答

0

df.toLocalIterator()將返回一個迭代for循環