2016-02-29 53 views
5

我有數千萬行的數據。是否有可能使用火花流分析在一週或一天內分析所有這些信息?根據數據量觸發流式傳輸的限制是什麼?我不確定什麼是上限,什麼時候我應該把它們放到我的數據庫中,因爲Stream可能無法處理它們了。我也有不同的時間窗口1,3,6小時等我用窗口操作來分離數據。同時根據數據量激發流媒體的限制是什麼?

conf = SparkConf().setAppName(appname) 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc,300) 
sqlContext = SQLContext(sc) 
channels = sc.cassandraTable("abc","channels") 
topic = 'abc.crawled_articles' 
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"} 

category = 'abc.crawled_article' 
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams) 
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x)) 


article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams) 
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x)) 

#axes topic integration the article and the axes 
axes_topic = 'abc.crawled_axes' 
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams) 
axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x)).map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']})) 
#axes_join_stream.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10).transform(axestrans).pprint() 

#join 
statistics = article_join_stream.window(1*60*60,5*60).cogroup(category_join_stream.window(1*60*60,60)).cogroup((axes_join_stream.window(24*60*60,5*60))) 
statistics.transform(joinstream).pprint() 

ssc.start() # Start the computation ssc.awaitTermination() 
ssc.awaitTermination() 
+1

這裏有很多問題,如果你清楚地分開它們,它會幫助回答。此外,如果您將包含的代碼最小化到足以說明問題的最小樣本,這將會很有幫助 – etov

回答

1

一:

請在下面找到我的代碼

  • 是否有可能分析[給定的時間]內[一些大型的行]

一般來說,是 - 星火允許您跨多臺機器進行擴展,所以在原則上,你應該能夠啓動一個大集羣,並在較短的時間內緊縮大量的數據(假設我們正在談論小時天而不是幾秒或更少,這可能由於開銷而出現問題)。

具體來說,在您的數千萬條記錄的問題中執行處理類型似乎對我來說在合理的時間內(即不使用極大的羣集)是可行的。

  • Spark Streaming在數據量方面的限制是什麼?

我不知道,但你會很難得到它。有非常大的部署例子,例如在ebay(「每天平均30TB的數百個指標」)。另請參閱FAQ,其中提到了一組8000臺機器並處理數據PB。

  • 何時將結果寫入[某種存儲]?

根據Spark-Streaming的basic model,數據以微批處理。如果你的數據確實是一個數據流(即沒有確定的結尾),那麼最簡單的方法就是存儲每個RDD的處理結果(即microbatch)。

如果您的數據不是流,例如你正在處理一堆靜態文件,你應該考慮放棄流部分(例如,只使用Spark作爲批處理器)。

由於您的問題提到了幾個小時的窗口大小,我懷疑您可能想考慮批量選項。

  • 如何在不同的時間窗口中處理相同的數據?

如果你使用的火花流,你可以保持多個狀態(例如使用mapWithState) - 每個時間窗口。

另一個想法(代碼更簡單,操作更復雜) - 您可以啓動多個集羣,每個集羣都有自己的窗口,從同一個流中讀取數據。

如果您正在進行批處理,可以在不同的時間窗口內多次運行相同的操作,例如,具有多個窗口大小的reduceByWindow