我想從Dstream的rdds中採樣。由於DSTREAM沒有sample()
轉型,這是RDDS的序列,所以我這樣做是爲了從DSTREAM取樣品並應用wordcount的就可以了:如何使用Spark Dstreams進行簡單的隨機採樣?(pyspark使用spark 1.6.1)
from pyspark import SparkContext
from pyspark import SparkConf
# Optionally configure Spark Settings
conf=SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")
conf.setAppName("SRS")
sc = SparkContext('local[3]', conf=conf)
from pyspark.streaming import StreamingContext
streamContext = StreamingContext(sc,3)
lines = streamContext.socketTextStream("localhost", 9000)
def sampleWord(rdd):
return rdd.sample(false,0.5,10)
lineSample = lines.foreachRDD(sampleWord)
words = lineSample.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word , 1))
wordCount = pairs.reduceByKey(lambda x, y: x + y)
wordCount.pprint(60)
streamContext.start()
streamContext.stop()
有了這個代碼,星火啓動,但沒有到底會發生。我不知道爲什麼rdd.sample()
不以這種方式工作?使用foreachRDD
,我們可以訪問流中的每個rdd,因此我認爲現在我們可以使用rdd特定的轉換。
再次出現在sampleWord =>收益rdd.sample錯誤(錯誤,0.5,100) :NameError:全局名稱 '真' 沒有定義。沒有任何反應。我不明白火花是否計算樣品。 – YyAaSs