2016-05-31 137 views
1

該代碼的目的是基於在RDD上運行的「myFunc」方法加載計算一些邏輯以獲得並行化優勢。pyspark中的傳遞函數

下列行: df_rdd = ParallelBuild()運行()地圖(拉姆達線:線)。.persist() R = df_rdd.map(ParallelBuild()myFunc的)

給我出口0閱讀谷歌認爲,Spark是懶惰的評估,使一些操作會觸發該效果,我補充一下:

r.count()給我:

TypeError: 'JavaPackage' object is not callable 

注意的事情是: R = df_rdd。地圖(Parall elBuild()。myFunc)

給「pipelinedrdd」不確定那是什麼,但看起來像一些轉變?數據= [(1,'a'),(1,'b'),(1,'c'),(2,'d'), )(3,'r'),(4,'a'),(2,'t'),(3,'y'),(1,'f')] df = sqlContext.createDataFrame ,schema = ['uid','address_uid'])

直接在我的主要功能,然後事情工作得很好。但顯然我鬆開了我的代碼的模塊化部分。

代碼:

from pyspark import SparkContext 
from pyspark.sql import SQLContext, HiveContext 
import csv, io, StringIO 
from pyspark.sql.functions import * 
from pyspark.sql import Row 
from pyspark.sql import * 
from pyspark.sql import functions as F 
from pyspark.sql.functions import asc, desc 
sc = SparkContext("local", "Summary Report") 
sqlContext = SQLContext(sc) 


class ParallelBuild(object): 

def myFunc(self, s): 
    l = s.split(',') 
    print l[0], l[1] 
    return l[0] 


def list_to_csv_str(x): 
    output = StringIO.StringIO("") 
    csv.writer(output).writerow(x) 
    return output.getvalue().strip() 

def run(self): 
    data = [(1,'a'), (1,'b'), (1,'c'), (2,'d'), (3,'r'), (4,'a'), (2,'t'), (3,'y'), (1,'f')] 
    df = sqlContext.createDataFrame(data, schema= ['uid', 'address_uid']) 
    return df 


if __name__ == "__main__": 

    df_rdd = ParallelBuild().run().map(lambda line: line).persist() 
    r = df_rdd.map(ParallelBuild().myFunc) 
    r.count() 

回答

0

好了,你的主要問題是「爲什麼沒有打印任何東西」答案有兩部分。

  1. 在分佈式計算中你不能真的print。所以你的功能myFunc不會打印任何東西給驅動程序。其原因相當複雜,所以我會指導您進入this page,以獲取更多關於爲什麼打印在Spark中無法正常工作的信息。

然而,呼籲r.count()應該打印出來9。爲什麼這不起作用?

  1. 您的功能myFunc沒有多大意義。當你在r = df_rdd.map(ParallelBuild().myFunc)中打電話時,你想通過df_rdd,我想。但這已經是一個DataFrame。此DataFrame的每一行都是類型,如果您致電df_rdd.first(),您將獲得Row(uid=1, address_uid=u'a')。你在做什麼myFunc是試圖使用split,但split是用於字符串對象,並且你有對象。我不知道爲什麼這不是拋出一個錯誤,但你根本不能在對象上調用split。考慮更多沿着r = df_rdd.map(lambda x: x[0])的路線。

因此,我認爲r.count()不起作用,因爲當您撥打myFunc時,有些事情變得混亂。


旁註:

df_rdd = ParallelBuild().run().map(lambda line: line).persist()。運行.map(lambda line: line)不會執行任何操作。您沒有對line進行任何更改,因此請勿運行map作業。代碼爲df_rdd = ParallelBuild().run().persist()