2016-08-18 263 views
0

我想從Dstream的rdds中採樣。由於DSTREAM沒有sample()轉型,這是RDDS的序列,所以我這樣做是爲了從DSTREAM取樣品並應用wordcount的就可以了:如何使用Spark Dstreams進行簡單的隨機採樣?(pyspark使用spark 1.6.1)

from pyspark import SparkContext 
from pyspark import SparkConf 

# Optionally configure Spark Settings 
conf=SparkConf() 
conf.set("spark.executor.memory", "1g") 
conf.set("spark.cores.max", "2") 

conf.setAppName("SRS") 
sc = SparkContext('local[3]', conf=conf) 

from pyspark.streaming import StreamingContext 
streamContext = StreamingContext(sc,3) 
lines = streamContext.socketTextStream("localhost", 9000) 

def sampleWord(rdd): 
    return rdd.sample(false,0.5,10) 


lineSample = lines.foreachRDD(sampleWord) 
words = lineSample.flatMap(lambda line: line.split(" ")) 
pairs = words.map(lambda word: (word , 1)) 
wordCount = pairs.reduceByKey(lambda x, y: x + y) 
wordCount.pprint(60) 


streamContext.start() 
streamContext.stop() 

有了這個代碼,星火啓動,但沒有到底會發生。我不知道爲什麼rdd.sample()不以這種方式工作?使用foreachRDD,我們可以訪問流中的每個rdd,因此我認爲現在我們可以使用rdd特定的轉換。

回答

0

使用transform,而不是foreachRDD。此外,您的代碼中存在拼寫錯誤。

def sampleWord(rdd): 
return rdd.sample(False,0.5,10) //False, not false 

lineSample = lines.transform(sampleWord) 
words = lineSample.flatMap(lambda line: line.split(" ")) 
pairs = words.map(lambda word: (word , 1)) 
wordCount = pairs.reduceByKey(lambda x, y: x + y) 
wordCount.pprint(60) 
0

使用transform

lineSample = lines.transform(sampleWord) 
+0

再次出現在sampleWord =>收益rdd.sample錯誤(錯誤,0.5,100) :NameError:全局名稱 '真' 沒有定義。沒有任何反應。我不明白火花是否計算樣品。 – YyAaSs