2017-08-15 276 views
0

我有一個RDD的分區包含元素(熊貓數據框,因爲它發生),可以很容易地變成行列表。把它看成是看起來像這樣RDD的pyspark行列表DataFrame

rows_list = [] 
for word in 'quick brown fox'.split(): 
    rows = [] 
    for i,c in enumerate(word): 
     x = ord(c) + i 
     row = pyspark.sql.Row(letter=c, number=i, importance=x) 
     rows.append(row) 
    rows_list.append(rows) 
rdd = sc.parallelize(rows_list) 
rdd.take(2) 

這給

[[Row(importance=113, letter='q', number=0), 
    Row(importance=118, letter='u', number=1), 
    Row(importance=107, letter='i', number=2), 
    Row(importance=102, letter='c', number=3), 
    Row(importance=111, letter='k', number=4)], 
[Row(importance=98, letter='b', number=0), 
    Row(importance=115, letter='r', number=1), 
    Row(importance=113, letter='o', number=2), 
    Row(importance=122, letter='w', number=3), 
    Row(importance=114, letter='n', number=4)]] 

我希望把它變成一個Spark數據幀。我希望我可以做

rdd.toDF() 

,但給人的無用結構

DataFrame[_1: struct<importance:bigint,letter:string,number:bigint>, 
      _2: struct<importance:bigint,letter:string,number:bigint>, 
      _3: struct<importance:bigint,letter:string,number:bigint>, 
      _4: struct<importance:bigint,letter:string,number:bigint>, 
      _5: struct<importance:bigint,letter:string,number:bigint>] 

我真正想要的是一個3列數據框,如該

desired_df = sql_context.createDataFrame(sum(rows_list, [])) 

讓我可執行如下操作:

desired_df.agg(pyspark.sql.functions.sum('number')).take(1) 

並得到答案23.

什麼是正確的方式去做這件事?

回答

1

當您需要行的RDD時,您擁有行列表的RDD;您可以將rddflatMap放在一起,然後將其轉換爲數據幀:

rdd.flatMap(lambda x: x).toDF().show() 

+----------+------+------+ 
|importance|letter|number| 
+----------+------+------+ 
|  113|  q|  0| 
|  118|  u|  1| 
|  107|  i|  2| 
|  102|  c|  3| 
|  111|  k|  4| 
|  98|  b|  0| 
|  115|  r|  1| 
|  113|  o|  2| 
|  122|  w|  3| 
|  114|  n|  4| 
|  102|  f|  0| 
|  112|  o|  1| 
|  122|  x|  2| 
+----------+------+------+ 

import pyspark.sql.functions as F 

rdd.flatMap(lambda x: x).toDF().agg(F.sum('number')).show() 
+-----------+ 
|sum(number)| 
+-----------+ 
|   23| 
+-----------+