2017-08-30 76 views
0

我在這裏詢問了相反的問題Create a tuple out of two columns - PySpark。我現在要做的就是將位於數據幀列中的元組列表解壓縮到每行兩個不同的列表中。所以根據下面的數據框,將v_tuple列返回到v1和v2。解壓縮元組列表--PySpark

+---------------+---------------+--------------------+ 
|    v1|    v2|    v_tuple| 
+---------------+---------------+--------------------+ 
|[2.0, 1.0, 9.0]|[9.0, 7.0, 2.0]|[(2.0,9.0), (1.0,...| 
|[4.0, 8.0, 9.0]|[1.0, 1.0, 2.0]|[(4.0,1.0), (8.0,...| 
+---------------+---------------+--------------------+ 

根據我以前的專欄中,我嘗試沒有成功如下:

unzip_ = udf(
    lambda l: list(zip(*l)), 
    ArrayType(ArrayType("_1", DoubleType()), ArrayType("_2", DoubleType()))) 

我使用pyspark 1.6

回答

1

您可以爆炸你數組,然後組再收回:

首先讓我們創建我們的數據幀:

df = spark.createDataFrame(
    sc.parallelize([ 
      [[2.0, 1.0, 9.0], [9.0, 7.0, 2.0], [(2.0,9.0), (1.0,7.), (9.,2.)]], 
      [[4.0, 8.0, 9.0], [1.0, 1.0, 2.0], [(4.0,1.0), (8.0,1.), (9., 2.)]] 
     ]), 
    ["v1", "v2", "v_tuple"] 
) 

讓我們增加一個行ID來唯一標識:

import pyspark.sql.functions as psf 
df = df.withColumn("id", psf.monotonically_increasing_id()) 

現在,我們可以爆炸欄「v_tuple」,並從元組的兩個元素創建兩列:

df = df.withColumn("v_tuple", psf.explode("v_tuple")).select(
    "id", 
    psf.col("v_tuple._1").alias("v1"), 
    psf.col("v_tuple._2").alias("v2") 
) 

    +-----------+---+---+ 
    |   id| v1| v2| 
    +-----------+---+---+ 
    |42949672960|2.0|9.0| 
    |42949672960|1.0|7.0| 
    |42949672960|9.0|2.0| 
    |94489280512|4.0|1.0| 
    |94489280512|8.0|1.0| 
    |94489280512|9.0|2.0| 
    +-----------+---+---+ 

最後,我們可以將它重新分組回去:

df = df.groupBy("id").agg(
    psf.collect_list("v1").alias("v1"), 
    psf.collect_list("v2").alias("v2") 
) 

    +-----------+---------------+---------------+ 
    |   id|    v1|    v2| 
    +-----------+---------------+---------------+ 
    |42949672960|[2.0, 1.0, 9.0]|[9.0, 7.0, 2.0]| 
    |94489280512|[4.0, 8.0, 9.0]|[1.0, 1.0, 2.0]| 
    +-----------+---------------+---------------+