2016-10-02 76 views
1

我已經開始學習spark,並且我編寫了一個pyspark流式處理程序,以從端口3333中讀取股票數據(符號,量)。Spark - 按鍵排序DStream並將其限制爲5個值

樣本數據在3333

"AAC",111113 
"ABT",7451020 
"ABBV",7325429 
"ADPT",318617 
"AET",1839122 
"ALR",372777 
"AGN",4170581 
"ABC",3001798 
"ANTM",1968246 

欲顯示基於volume頂部5個符號流。所以我用一個映射器來讀取每一行,然後將它拆分爲comma並反轉。

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 

sc = SparkContext("local[2]", "NetworkWordCount") 
ssc = StreamingContext(sc, 5) 

lines = ssc.socketTextStream("localhost", 3333) 
stocks = lines.map(lambda line: sorted(line.split(','), reverse=True)) 
stocks.pprint() 

以下是stocks.pprint()

[u'111113', u'"AAC"'] 
[u'7451020', u'"ABT"'] 
[u'7325429', u'"ABBV"'] 
[u'318617', u'"ADPT"'] 
[u'1839122', u'"AET"'] 
[u'372777', u'"ALR"'] 
[u'4170581', u'"AGN"'] 
[u'3001798', u'"ABC"'] 
[u'1968246', u'"ANTM"'] 

輸出我得記住以下函數來顯示股票符號,但不知道如何將股票按鍵(volume)進行排序,然後限制該功能僅顯示前5個值。

stocks.foreachRDD(processStocks) 

def processStocks(stock): 
    for st in stock.collect(): 
     print st[1] 

回答

3

由於流表示無限序列,所以您可以對每個批次進行排序。首先,你必須正確地解析數據:

lines = ssc.queueStream([sc.parallelize([ 
    "AAC,111113", "ABT,7451020", "ABBV,7325429","ADPT,318617", 
    "AET,1839122", "ALR,372777", "AGN,4170581", "ABC,3001798", 
    "ANTM,1968246" 
])]) 

def parse(line): 
    try: 
     k, v = line.split(",") 
     return [(k, int(v))] 
    except: 
     return [] 

parsed = lines.flatMap(parse) 

下一頁排序每批:

sorted_ = parsed.transform(
    lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)) 

最後,你可以pprint頂部元素:

sorted_.pprint(5) 

如果一切順利,你應該得到如下輸出:

-------------------------------------------       
Time: 2016-10-02 14:52:30 
------------------------------------------- 
('ABT', 7451020) 
('ABBV', 7325429) 
('AGN', 4170581) 
('ABC', 3001798) 
('ANTM', 1968246) 
...