2016-03-24 38 views
0

我正在使用Spark流來連續讀取來自kafka的數據並執行一些統計。我每秒都在流。如何查找每個DStream中RDD中所有值的總和?

所以我有一秒批(dstreams)。此dstream內的每個RDD都包含一個JSON。

這是我有我的DSTREAM:

kafkaStream = KafkaUtils.createDirectStream(stream, ['livedata'], {"metadata.broker.list": 'localhost:9092'}) 
raw = kafkaStream.map(lambda kafkaS: kafkaS[1]) 
clean = raw.map(lambda xs:json.loads(xs)) 

一個RDDS在我乾淨 DSTREAM看起來是這樣的:

{u'epochseconds': 1458841451, u'protocol': 6, u'source_ip': u'192.168.1.124', \ 
u'destination_ip': u'149.154.167.120', u'datetime': u'2016-03-24 17:44:11', \ 
u'length': 1589, u'partitionkey': u'partitionkey', u'packetcount': 10,\ 
u'source_port': 43375, u'destination_port': 443} 

而且我喜歡在30-150這樣的RDDS每個DStream。

現在,我想要做的是,獲得'長度'的總和或在每個DStream中說'packetcounts'。也就是說,

rdd1.length + rdd2.length + ... + LastRDDInTheOneSecondBatch.length 

我試了一下:

add=clean.map(lambda xs: (xs['length'],1)).reduceByKey(lambda a, b: a+b) 

我得到了什麼:

頻率,而不是總和。

(17, 6) 
(6, 24) 

我該怎麼做纔能有總和而不是密鑰的頻率?

回答

1

那是因爲你使用的「長度」作爲一個鍵的值,試試這個:

add=clean.map(lambda xs: ('Lenght',xs['length'])).reduceByKey(lambda a, b: a+b) 

你必須設定相同的密鑰對所有對(鍵,值)。值可能是字段長度或其他字段集合...

+0

作品,謝謝!只是一個額外的問題,我想從clean添加2個參數到add,比如說('partitionkey','timestamp'),以及剛剛計算的'length'參數。我怎麼做? – HackCode

相關問題