2017-08-03 51 views
2

我正在使用Spark 2.1.1和dataframe。這裏是我的輸入數據框:轉換數據框:幾列按順序排列

+----+---------+---------+-------+ 
| key|parameter|reference| subkey| 
+----+---------+---------+-------+ 
|key1|  45|  10|subkey1| 
|key1|  45|  20|subkey2| 
|key2|  70|  40|subkey2| 
|key2|  70|  30|subkey1| 
+----+---------+---------+-------+ 

我需要的數據幀轉換到下一:

result data (by pandas): 
+-----+-----------+ 
|label| features| 
+-----+-----------+ 
| 45|[10.0,20.0]| 
| 70|[30.0,40.0]| 
+-----+-----------+ 

我能做的改造與大熊貓的幫助:

def convert_to_flat_by_pandas(df): 
    pandas_data_frame = df.toPandas() 
    all_keys = pandas_data_frame['key'].unique() 

    flat_values = [] 
    for key in all_keys: 
     key_rows = pandas_data_frame.loc[pandas_data_frame['key'] == key] 
     key_rows = key_rows.sort_values(by=['subkey']) 

     parameter_values = key_rows['parameter'] 
     parameter_value = parameter_values.real[0]   

     key_reference_value = [reference_values for reference_values in key_rows['reference']] 

     flat_values.append((parameter_value, key_reference_value)) 

    loaded_data = [(label, Vectors.dense(features)) for (label, features) in flat_values] 
    spark_df = spark.createDataFrame(loaded_data, ["label", "features"]) 

    return spark_df 

看來,我需要使用GroupBy,但我不明白如何排序和轉換組(幾行)單行。

源工作樣品(有熊貓的幫助):https://github.com/constructor-igor/TechSugar/blob/master/pythonSamples/pysparkSamples/df_flat.py

隨着2個回答可以幫助我得到2個可能的解決方案:

UPD1解決方案#1

def convert_to_flat_by_sparkpy(df): 
    subkeys = df.select("subkey").dropDuplicates().collect() 
    subkeys = [s[0] for s in subkeys] 
    print('subkeys: ', subkeys) 
    assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features") 
    spark_df = assembler.transform(df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))))  
    spark_df = spark_df.withColumnRenamed("parameter", "label") 
    spark_df = spark_df.select("label", "features") 
    return spark_df 

UPD1解決方案#2

def convert_to_flat_by_sparkpy_v2(df): 
    spark_df = df.orderBy("subkey") 
    spark_df = spark_df.groupBy("key").agg(first(col("parameter")).alias("label"), collect_list("reference").alias("features")) 
    spark_df = spark_df.select("label", "features") 
    return spark_df 
+0

我需要pyspark數據幀分組(而不是在熊貓) – constructor

+0

是什麼features'的'類型,可以你顯示'printSchema'的輸出? –

+0

模式:'模式結果的數據幀: 根 | - label:string(nullable = true) | - features:vector(nullable = true)' – constructor

回答

1

對於已給出的有限的樣本的數據,可以轉換該數據幀到寬幅與子項作爲標題,然後使用VectorAssembler收集它們作爲特徵:

from pyspark.sql.functions import first, col 
from pyspark.ml.feature import VectorAssembler 

assembler = VectorAssembler().setInputCols(["subkey1", "subkey2"]).setOutputCol("features") 

assembler.transform(
    df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))) 
).show() 
+----+---------+-------+-------+-----------+ 
| key|parameter|subkey1|subkey2| features| 
+----+---------+-------+-------+-----------+ 
|key1|  45|  10|  20|[10.0,20.0]| 
|key2|  70|  30|  40|[30.0,40.0]| 
+----+---------+-------+-------+-----------+ 

更新動態子項:

說,如果你有這樣一個數據幀:

df.show() 
+----+---------+---------+-------+  
| key|parameter|reference| subkey| 
+----+---------+---------+-------+ 
|key1|  45|  10|subkey1| 
|key1|  45|  20|subkey2| 
|key2|  70|  40|subkey2| 
|key2|  70|  30|subkey1| 
|key2|  70|  70|subkey3| 
+----+---------+---------+-------+ 

收集所有唯一的子鍵首先,然後使用子項創建彙編:

subkeys = df.select("subkey").dropDuplicates().rdd.map(lambda r: r[0]).collect() 
assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features") 

assembler.transform( 
    df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))).na.fill(0) 
).show() 
+----+---------+-------+-------+-------+----------------+ 
| key|parameter|subkey1|subkey2|subkey3|  features| 
+----+---------+-------+-------+-------+----------------+ 
|key1|  45|  10|  20|  0| [20.0,10.0,0.0]| 
|key2|  70|  30|  40|  70|[40.0,30.0,70.0]| 
+----+---------+-------+-------+-------+----------------+ 
+0

這是一個很好的示例,但我有2個問題與我的真實代碼:我有隨機的子鍵值,我不能創建Vectors.dense df.features)' – constructor

+0

而不是'subkeys = df.select(「subkey」)。dropDuplicates()。rdd.map(lambda r:r [0])。collect()'我加了''subkeys = df.select 「subkey」)。dropDuplicates()。collect() subkeys = [s [0] for s in subkeys]' – constructor

2

您可以使用GROUPBY和collect_list函數來獲取輸出

import org.apache.spark.sql.functions._ 

df.groupBy("parameter").agg(collect_list("reference").alias("features")) 

df1.withColumnRenamed("parameter", "label") 

輸出:

+---------+--------+ 
|parameter|features| 
+---------+--------+ 
|  45|[10, 20]| 
|  70|[40, 30]| 
+---------+--------+ 

希望這有助於!

+0

我認爲這會創建一個功能數組而不是矢量 –

+0

我需要通過子項對「功能」進行排序。預期結果:'[10,20]'和[30,40]' – constructor

+0

列「參數」上的值不是唯一的。恐怕我們不能使用'groupBy'中的列。 – constructor