2017-08-30 48 views
0

PySpark有什麼具體的方式來綁定兩個數據框,因爲我們在c中綁定了嗎?PySpark列明智綁定

實施例:

  1. 數據幀1具有10列
  2. 數據幀2具有1列

我需要cbind兩個數據幀,併爲在PySpark一個數據幀。

回答

0

首先,讓我們創建dataframes:

df1 = spark.createDataFrame(sc.parallelize([10*[c] for c in range(10)]), ["c"+ str(i) for i in range(10)]) 
df2 = spark.createDataFrame(sc.parallelize([[c] for c in range(10, 20, 1)]), ["c10"]) 
    +---+---+---+---+---+---+---+---+---+---+ 
    | c0| c1| c2| c3| c4| c5| c6| c7| c8| c9| 
    +---+---+---+---+---+---+---+---+---+---+ 
    | 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 
    | 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 
    | 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 
    | 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 
    | 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 
    | 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 
    | 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 
    | 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 
    | 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 
    | 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 
    +---+---+---+---+---+---+---+---+---+---+ 

    +---+ 
    |c10| 
    +---+ 
    | 10| 
    | 11| 
    | 12| 
    | 13| 
    | 14| 
    | 15| 
    | 16| 
    | 17| 
    | 18| 
    | 19| 
    +---+ 

然後我們想唯一標識行,有一個RDD功能,可以做到這zipWithIndex

from pyspark.sql.types import LongType 
from pyspark.sql import Row 
def zipindexdf(df): 
    schema_new = df.schema.add("index", LongType(), False) 
    return df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new) 

df1_index = zipindexdf(df1) 
df1_index.show() 
df2_index = zipindexdf(df2) 
df2_index.show() 

    +---+---+---+---+---+---+---+---+---+---+-----+ 
    | c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|index| 
    +---+---+---+---+---+---+---+---+---+---+-----+ 
    | 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 
    | 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 
    | 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 
    | 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 
    | 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 
    | 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 
    | 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 
    | 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 
    | 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 
    | 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 
    +---+---+---+---+---+---+---+---+---+---+-----+ 

    +---+-----+ 
    |c10|index| 
    +---+-----+ 
    | 10| 0| 
    | 11| 1| 
    | 12| 2| 
    | 13| 3| 
    | 14| 4| 
    | 15| 5| 
    | 16| 6| 
    | 17| 7| 
    | 18| 8| 
    | 19| 9| 
    +---+-----+ 

最後,我們可以加入他們的行列:

df = df1_index.join(df2_index, "index", "inner") 

    +-----+---+---+---+---+---+---+---+---+---+---+---+ 
    |index| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10| 
    +-----+---+---+---+---+---+---+---+---+---+---+---+ 
    | 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 10| 
    | 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 17| 
    | 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 16| 
    | 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 19| 
    | 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 15| 
    | 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 11| 
    | 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 13| 
    | 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 18| 
    | 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 12| 
    | 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 14| 
    +-----+---+---+---+---+---+---+---+---+---+---+---+ 
+0

非常感謝。它的做工精細:) –

+0

這不適用於可能存儲在不同分區中的兩個單獨的大型DataFrame,並且每個DataFrame在不同行上的分區之間分隔開來。從[文檔](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id)***「當前實現將分區ID在高31位,並在每個分區內的記錄號在低33位「*** – Clay

+1

你是對的,我不能相信我寫了... ...'MonotonicallyIncreasingID'的計數有一個不同的每個任務的起源 – MaFF

0

要獲取ID爲單調增加的列,請在每個DataFrame上使用以下代碼,其中colName是要按每個DataFrame排序的列名。

import pyspark.sql.functions as F 
from pyspark.sql.window import Window as W 

window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow) 

df = df\ 
.withColumn('int', F.lit(1))\ 
.withColumn('consec_id', F.sum('int').over(window))\ 
.drop('int')\ 

要檢查一切都排隊正確,使用下面的代碼來看看尾或數據幀的最後rownums

rownums = 10 
df.where(F.col('consec_id')>df.count()-rownums).show() 

使用下列代碼行把目光從start_row到數據幀的end_row

start_row = 20 
end_row = 30 
df.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show() 

更新

的作品的另一種方法是RDD方法zipWithIndex()。簡單地修改現有的數據幀具有連續的ID使用這種RDD方法的一列,I:

  1. 轉換的DF到RDD,
  2. 施加的zipWithIndex()方法,
  3. 轉換返回的RDD爲數據幀,
  4. 轉換的數據幀到RDD,
  5. 映射的RDD lambda函數到原始數據幀與所述索引的RDD行對象結合,
  6. 轉換的最終RDD爲與數據幀原始列名+由zipWithIndex()創建的整數的ID列。

我也嘗試過修改原始DataFrame的方法,該索引列包含類似於@MaFF所做的zipWithIndex()的輸出,但結果更慢。窗函數比這兩者中的任何一個都快一個數量級。大部分時間增加似乎是將DataFrame轉換爲RDD並再次返回。

請讓我知道是否有更快的方式將zipWithIndex() RDD方法的輸出添加爲原始DataFrame中的列。

對42,000行90列DataFrame進行測試得出以下結果。

import time 

def test_zip(df): 
    startTime = time.time() 
    df_1 = df \ 
    .rdd.zipWithIndex().toDF() \ 
    .rdd.map(lambda row: (row._1) + (row._2,)) \ 
    .toDF(df_all_indexed.columns +['consec_id']) 

    start_row = 20000 
    end_row = 20010 
    df_1.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show() 
    endTime = time.time() - startTime 
    return str(round(endTime,3)) + " seconds" 

[test_zip(df) for _ in range(5)] 

['590.813秒, '390.574秒, '360.074秒, '350.436秒, '350.636秒 ']

import time 
import pyspark.sql.functions as F 
from pyspark.sql.window import Window as W 

def test_win(df): 
    startTime = time.time() 
    window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow) 
    df_2 = df \ 
    .withColumn('int', F.lit(1)) \ 
    .withColumn('IDcol', F.sum('int').over(window)) \ 
    .drop('int') 

    start_row = 20000 
    end_row = 20010 
    df_2.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show() 
    endTime = time.time() - startTime 
    return str(round(endTime,3)) + " seconds" 

[test_win(df) for _ in range(5)] 

[' 4.19秒, '4.508秒', '4.099秒', '4.012秒', '4.045秒']

import time 
from pyspark.sql.types import StructType, StructField 
import pyspark.sql.types as T 

def test_zip2(df): 
    startTime = time.time() 
    schema_new = StructType(list(df.schema) + [StructField("consec_id", T.LongType(), False)]) 
    df_3 = df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new) 

    start_row = 20000 
    end_row = 20010 
    df_3.where((F.col('IDcol')>start_row) & (F.col('consec_id')<end_row)).show() 
    endTime = time.time() - startTime 
    return str(round(endTime,3)) + " seconds" 

[test_zip2(testdf) for _ in range(5)] 

['820.795秒, '610.689秒, '580.181秒, '580.01秒, '570.765秒]