2017-01-30 108 views
0

我需要使用pySpark遍歷dataframe,就像我們可以使用for循環迭代一組值。以下是我寫的代碼。這段代碼的問題是使用pySpark對數據框的每一行進行迭代

  1. 我必須使用收集它打破了並行
  2. 我無法從數據框的功能打印任何值funcRowIter
  3. 我不能破環,一旦我有找到的比賽。

我必須這樣做,在pySpark,不能用熊貓此:

from pyspark.sql.functions import * 
from pyspark.sql import HiveContext 
from pyspark.sql import functions 
from pyspark.sql import DataFrameWriter 
from pyspark.sql.readwriter import DataFrameWriter 
from pyspark import SparkContext 

sc = SparkContext() 
hive_context = HiveContext(sc) 

tab = hive_context.sql("select * from update_poc.test_table_a") 

tab.registerTempTable("tab") 
print type(tab) 

df = tab.rdd 

def funcRowIter(rows): 
    print type(rows) 
     if(rows.id == "1"): 
      return 1 

df_1 = df.map(funcRowIter).collect() 
print df_1 

回答

1

看來,你的目標是顯示一個特定行。 您可以使用.filter,然後使用.collect

例如,

row_1 = rdd.filter(lambda x: x.id==1).collect() 

但是,它不會是有效的嘗試疊代的數據幀這種方式。

+0

我試圖做到這一點沒有收集()怎麼收集將打破並行 –

0

而不是使用df_1 = df.map(funcRowIter).collect()你應該嘗試UDF。希望這會有所幫助。

from pyspark.sql.functions import struct 
from pyspark.sql.functions import * 
def funcRowIter(rows): 
    print type(rows) 
    if(row is nor None and row.id is not None) 
     if(rows.id == "1"): 
      return 1 
A = udf(funcRowIter, ArrayType(StringType())) 
z = df.withColumn(data_id, A(struct([df[x] for x in df.columns]))) 
z.show() 

collect()永遠不會成爲非常大的數據很好的選擇即數百萬條記錄