2015-11-05 145 views
3

(還有一些關於超時和maxSpoutPending的問題)風暴如何知道郵件何時「完全處理」?

我在Storm文檔中看到很多關於消息被完全處理的參考文獻。但是,我的KafkaSpout如何知道消息何時完全處理?

希望它能認識到我的螺栓連接的方式,所以當我的Stream中的最後一個螺栓夾住一個元組時,噴口知道我的消息何時處理?否則,我會想象在超時時間到期後,檢查消息的確認狀態,並且如果由確認/錨定XOR指示,則認爲它已被處理。但我希望事實並非如此?

我也有關於maxTuplesPending和超時配置的相關問題。

如果我將maxTuplePending設置爲10k,那麼我是否認爲每個spout實例都會繼續發射元組,直到spout實例正在跟蹤10k元組中的10k個元組,這些元組尚未完全處理?然後當一個正在處理的飛行消息被完全處理時,新的元組被髮射出去了嗎?

最後,這是否與超時配置有關?噴嘴在發出新消息之前是否以任何方式等待發生配置的超時?或者,如果消息處於停滯/緩慢狀態,超時配置才起作用,導致由於超時而失敗?

更簡潔(或希望更清楚),是否有一個效果來設置我的超時30分鐘除非消息不會失敗,除非他們在30分鐘內被最終的博爾特認可?或者是否還有其他影響,例如影響噴口排放速率的超時配置?

對不起,漫長而漫長的問題。預先感謝任何迴應。

*編輯進一步澄清

的原因,這對我來說是一個問題,是因爲我的消息並不一定要通過整個流運行。

說我有螺栓A,B,C,D。大多數時間的消息將從A-> B - > - > D傳遞。但是我有一些信息會故意停在螺栓A上.A會識別它們,但不會發出它們(因爲我的業務邏輯,在這種情況下,我希望進一步處理這些信息)。

那麼我的KafkaSpout是否知道被Ack發送但未從A發出的消息會被完全處理?因爲我希望在螺栓A完成之後馬上從噴口發出另一條消息,在這種情況下。

回答

4

Storm通過UDF代碼必須使用的錨定機制跟蹤整個拓撲中的元組。該錨定導致所謂的元組樹(tuple-tree),如果樹的根是由噴口發出的元組,則所有其他節點(以樹結構連接)代表來自螺栓的發射元組,其使用輸入元組作爲錨(這只是一個邏輯模型,並沒有在Storm中以這種方式實現)。

例如,Spout會發出一個由第一個螺栓分開的句子元組,第二個螺栓將過濾一些單詞,第三個螺栓將應用一個單詞計數。最後,水槽螺栓將結果寫入文件。樹是這樣的:

"this is an example sentence" -+-> "this" 
           +-> "is" 
           +-> "an" 
           +-> "example" -> "example",1 -> "example",1 
           +-> "sentence" -> "sentence",1 -> "sentence",1 

,頭一句是通過口發出,通過bolt1對於那些發出的所有令牌作爲錨,並且得到由bolt1獲得確認。 Bolt2過濾出「this」,「is」和「an」,並且僅僅包含三個元組。 「例子」和「句子」只是被轉發,用作輸出元組的錨點並在之後進行搜索。同樣的情況發生在bolt2中,並且最後的sink螺栓只是對所有傳入的元組進行確認。此外,Storm追蹤所有元組的所有元組,即從中間螺栓和下沉螺栓中抽取所有元組。首先,噴口將輸出元組的ID發送給acker任務。每次元組被用作定位點時,acker也會收到一個消息,其中包含錨元組標識和輸出元組ID(由Storm自動生成)。來自螺栓的ackes也會轉到與XORs相同的acker任務。如果所有的ack都被接收到 - 即,對於噴口和所有遞歸錨定的輸出元組 - - (XOR結果將爲零),則acker向噴口發送消息,該元組已被完全處理,並且發生後退到Spout.ack(MessageId)即當元組被完全處理時立即完成回呼)。此外,ackers會定期檢查,如果有一個由acker註冊的元組比超時長。如果發生這種情況,acker將丟棄元組ID,並向元組發送一條消息,指出元組失敗(導致調用Spout.fail(MessageId))。

此外,Spouts保留在飛行中的所有元組的計數,並且如果該計數超過maxTuplesPending參數,則停止呼叫Spout.nextTuple()。據我所知,這個參數是全局應用的,也就是說,每個噴口任務的局部計數被總結,並且全局計數與參數進行比較(不知道如何詳細實現這個)。因此timeout參數獨立於maxTuplesPending

+0

非常感謝您的詳細解答。你能解決我在編輯中提到的情況嗎?風暴如何知道「所有的賄賂都已收到」。如果我故意在A後插入A,但不要將元組發送到B,那麼這個消息是否會超時? – ab11

+0

看看我的例子。元組「this」,「is」和「an」將被一箇中間螺栓過濾掉,即只有acked和沒有輸出。這工作得很好。如果一個句子中的所有單詞都會被過濾掉,那麼這棵樹就不那麼深了,但是初始句子對噴口來說會很好。 –

+0

再次感謝。我仍然對Storm如何處理這個問題感到困惑。如果螺栓A調用ack並且不發射,Storm如何知道調用Spout.ack(messageId)?我認爲只有當Tuple樹中的最後一個螺栓確定了一個元組時,它纔會調用Spout.ack(messageId)(它怎麼知道在這種情況下,我沒有發出這個樹,比典型的深)。我問這是因爲即使我的拓撲在超時時間內完全處理了它的消息,我也看到了噴口故障,所以我懷疑這些故障來自螺栓A確認但沒有發出的消息。 – ab11