2016-07-06 52 views
-1

在PySpark中,對於RDD的每個元素,我試圖獲得一個元素的數組。然後我想將結果轉換爲DataFrame。無法在RDD上應用flatMap

我有以下代碼:

simulation = housesDF.flatMap(lambda house: goThroughAB(jobId, house)) 
    print simulation.toDF().show() 

在那,我調用這些輔助方法:

def simulate(jobId, house, a, b): 
    return Row(jobId=jobId, house=house, a=a, b=b, myVl=[i for i in range(10)]) 

def goThroughAB(jobId, house): 
    print "in goThroughAB" 
    results = [] 
    for a in as: 
    for b in bs: 
     results += simulate(jobId, house, a, b) 
    print type(results) 
    return results 

奇怪的是print "in goThroughAB"沒有任何影響,因爲有上沒有輸出屏幕。

不過,我得到這個錯誤:

---> 23 print simulation.toDF().show() 
    24 
    25 dfRow = sqlContext.createDataFrame(simulationResults) 

/databricks/spark/python/pyspark/sql/context.py in toDF(self, schema, sampleRatio) 
    62   [Row(name=u'Alice', age=1)] 
    63   """ 
---> 64   return sqlContext.createDataFrame(self, schema, sampleRatio) 
    65 
    66  RDD.toDF = toDF 

/databricks/spark/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio) 
    421 
    422   if isinstance(data, RDD): 
--> 423    rdd, schema = self._createFromRDD(data, schema, samplingRatio) 
    424   else: 
    425    rdd, schema = self._createFromLocal(data, schema) 

/databricks/spark/python/pyspark/sql/context.py in _createFromRDD(self, rdd, schema, samplingRatio) 
    308   """ 
    309   if schema is None or isinstance(schema, (list, tuple)): 
--> 310    struct = self._inferSchema(rdd, samplingRatio) 
    311    converter = _create_converter(struct) 
    312    rdd = rdd.map(converter) 

/databricks/spark/python/pyspark/sql/context.py in _inferSchema(self, rdd, samplingRatio) 
    261 
    262   if samplingRatio is None: 
--> 263    schema = _infer_schema(first) 
    264    if _has_nulltype(schema): 
    265     for row in rdd.take(100)[1:]: 

/databricks/spark/python/pyspark/sql/types.py in _infer_schema(row) 
    829 
    830  else: 
--> 831   raise TypeError("Can not infer schema for type: %s" % type(row)) 
    832 
    833  fields = [StructField(k, _infer_type(v), True) for k, v in items] 

TypeError: Can not infer schema for type: <type 'str'> 

在此行中:

print simulation.toDF().show() 

所以看起來不執行goThroughAB,這意味着flatMap可能不會被執行。

代碼的問題是什麼?

+0

打印語句是在分佈式環境中毫無用處。而這個問題也沒有例子'住宅DF'。 – zero323

回答

0

首先,您不是在驅動程序上打印,而是在Spark執行程序上打印。如您所知,執行程序是並行執行Spark任務的遠程進程。他們打印該行,但在自己的控制檯上。您不知道哪個執行程序運行某個分區,您不應該在分佈式環境中依賴打印語句。

然後問題是,當你想創建DataFrame時,Spark需要知道表的模式。如果您沒有指定它,它將使用採樣率並檢查一些行以確定它們的類型。如果你不指定採樣率,它只會檢查第一行。這發生在你的情況下,你可能有一個字段,其類型不能確定(它可能爲空)。

要解決此問題,您應該將模式添加到toDF()方法或指定非零採樣率。該架構可以像這樣預先創建:

schema = StructType([StructField("int_field", IntegerType()), 
        StructField("string_field", StringType())]) 
0

此代碼不正確。 results += simulate(jobId, house, a, b)會嘗試連接行並失敗。如果您沒有看到TypeError,則表示未達到,並且您的代碼在其他位置失敗,可能是在創建housesDF時。

-1

其他人指出的關鍵問題是results += simulate(jobId, house, a, b),當simulation返回一個Row對象時,它將不起作用。您可以嘗試使results a list然後使用list.append。但爲什麼不是yield

def goThroughAB(jobId, house): 
    print "in goThroughAB" 
    results = [] 
    for a in as: 
    for b in bs: 
     yield simulate(jobId, house, a, b) 

當你+兩行對象發生了什麼?

In[9]: 
from pyspark.sql.types import Row 
Row(a='a', b=1) + Row(a='b', b=2) 

Out[9]: 
('a', 1, 'b', 2) 

然後toDF採樣的第一個元素,並發現這是一個str(你的jobId),因此抱怨

TypeError: Can not infer schema for type: <type 'str'>