2015-11-03 67 views
2

如果使用Samza的OutgoingMessageEnvelope使用這種格式來發送消息:Samza在發送消息時是否自動創建分區?

public OutgoingMessageEnvelope(SystemStream systemStream, 
           java.lang.Object partitionKey, 
           java.lang.Object key, 
           java.lang.Object message) 
Constructs a new OutgoingMessageEnvelope from specified components. 
Parameters: 
systemStream - Object representing the appropriate stream of which this envelope will be sent on. 
partitionKey - A key representing which partition of the systemStream to send this envelope on. 
key - A deserialized key to be used for the message. 
message - A deserialized message to be sent in this envelope. 

和調用流任務的過程中()方法中這種方法,並希望將傳入郵件到適當的分區,將Samza創建你調用該方法時的分區?

E.g.

MessageA = {"id": "idA", "key": "keyA", "body":"some details"} 
MessageB = {"id": "idB", "key": "keyB", "body":"some more details"} 

如果我稱之爲內流任務的process()其中msg是一個消息實例:

public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { 
    // ... 
    String partition = msg["id"] 
    String key = msg["key"] 
    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "PartitionedMessages"), id, key, msg)); 
    // ... 

這是否會自動創建分區,IDA和美洲開發銀行爲我(即我需要創造這些分區前我發信息給他們)?我希望能夠將消息路由到適當的分區,並且能夠使用單獨的消息密鑰記錄壓縮。

回答

3

創建主題時必須指定分區數。你不能動態添加新的分區(當然,你可以可以,但這並不容易,Samza不會自動完成)。如果主題不存在,但默認分區數量,Samza應爲您創建新主題。這取決於設置。你可以測試它。

但值msg["id"]未指定分區的名稱。該值僅用於計算目標分區的數量。該值被散列爲一個數字,然後使用模數進行修整。像這樣的東西(有多種算法,這是基本的一個):

partitionID = hash(msg["id"]) % total_number_of_partitions 

而且partitionID始終是一個非負整數。這意味着它實際上有多少個分區並不重要。它總是在一些結束。主要的想法是,如果你有兩個消息具有相同的msg["id"],那麼這些消息將最終在相同的分區中。這通常是你想要的。

日誌壓縮將按照您預期的方式工作 - 它將從特定分區中刪除具有相同密鑰的消息(但是如果您有兩個消息具有相同密鑰並且具有兩個不同分區,則不會被刪除)。

僅供參考,您可以使用kafkacat找出分區數量和其他有用的東西。

+0

非常感謝,這是一個非常明確和有用的答案。 – John