2016-08-23 46 views
1

我是Storm的新手。我正在將它用於大學項目。如何停止Storm中的元組處理並執行其他代碼

我創建了我的拓撲,連接到MySql數據庫的Spout和兩個Bolts。與噴口相連的第一個螺栓準備並去除元組中不需要的信息;第二,做元組的過濾。

我在本地模式下工作。

我的問題是: 爲什麼運行拓撲後,在我的控制檯中,我看到輸出像下面的行?

38211 [Thread-14-movie-SPOUT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30] 
67846 [Thread-10-__acker] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67846 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67852 [Thread-10-__acker] INFO backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]] 
67853 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]] 
67854 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67855 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]] 

,我讀了這些行處理的最後一個元組後,被認爲是正常的。不是嗎?

如何在提交拓撲後運行其他代碼?例如,我想打印在第二個螺栓中完成的過濾結果,保存在HashMap中。 如果我將代碼放在包含submitTopology()方法的行之後,代碼將在元組完成之前運行。

第二個和最後一個問題是:爲什麼在風暴的每一個例子,我看到在噴

「的Thread.sleep(1000)」?

也許它與我的第一個問題有關。

我希望我的問題很清楚。 提前謝謝!

回答

0

我讀到這些行處理後的最後一個元組被認爲是正常的。不是嗎?

這些只是INFO消息。所以不用擔心它們。

如果我把代碼放在包含submitTopology()方法的行後面,代碼就會在元組完成之前運行。

如果您提交了拓撲結構,拓撲將在後臺執行(即多線程)。這是必需的,因爲拓撲運行是「永久」的(直到你明確地停止它 - 或者你的Java應用程序終止,因爲你正在運行本地模式)。

「拓撲完成後」的運行代碼與Storm概念並不一致,因爲Strom是一個流式處理系統,並且「處理中沒有結束」(輸入流處於無限狀態,因此處理將永久運行)。如果你想處理一個有限的數據集,你可能想要考慮像Flink或Spark這樣的批處理框架。

因此,如果您想在Storm中完成這項工作,您需要能夠確定何時處理所有數據。因此,在拓撲提交之後,在處理完所有數據之後,您將顯式阻止並等待。

但是,對於您的用例,您爲什麼不直接從最後一個螺栓中打印結果?

關於Thread.sleep()我不確定你指的是什麼樣的例子。不知道爲什麼有人應該把它用於生產。也許是爲了演示目的而人爲地減慢處理速度。

+0

謝謝你的詳盡回覆,馬提亞! –

+0

如果這回答你的問題,隨時接受和/或upvote我的答案。 –