該代碼的目的是基於在RDD上運行的「myFunc」方法加載計算一些邏輯以獲得並行化優勢。pyspark中的傳遞函數
下列行: df_rdd = ParallelBuild()運行()地圖(拉姆達線:線)。.persist() R = df_rdd.map(ParallelBuild()myFunc的)
給我出口0閱讀谷歌認爲,Spark是懶惰的評估,使一些操作會觸發該效果,我補充一下:
r.count()給我:
TypeError: 'JavaPackage' object is not callable
注意的事情是: R = df_rdd。地圖(Parall elBuild()。myFunc)
給「pipelinedrdd」不確定那是什麼,但看起來像一些轉變?數據= [(1,'a'),(1,'b'),(1,'c'),(2,'d'), )(3,'r'),(4,'a'),(2,'t'),(3,'y'),(1,'f')] df = sqlContext.createDataFrame ,schema = ['uid','address_uid'])
直接在我的主要功能,然後事情工作得很好。但顯然我鬆開了我的代碼的模塊化部分。
代碼:
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
import csv, io, StringIO
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import *
from pyspark.sql import functions as F
from pyspark.sql.functions import asc, desc
sc = SparkContext("local", "Summary Report")
sqlContext = SQLContext(sc)
class ParallelBuild(object):
def myFunc(self, s):
l = s.split(',')
print l[0], l[1]
return l[0]
def list_to_csv_str(x):
output = StringIO.StringIO("")
csv.writer(output).writerow(x)
return output.getvalue().strip()
def run(self):
data = [(1,'a'), (1,'b'), (1,'c'), (2,'d'), (3,'r'), (4,'a'), (2,'t'), (3,'y'), (1,'f')]
df = sqlContext.createDataFrame(data, schema= ['uid', 'address_uid'])
return df
if __name__ == "__main__":
df_rdd = ParallelBuild().run().map(lambda line: line).persist()
r = df_rdd.map(ParallelBuild().myFunc)
r.count()