3

下面兩個代碼片段發佈消息的行爲有什麼不同?春季卡夫卡分區

方法1

Message<String> message = MessageBuilder.withPayload("testmsg") 
     .setHeader(KafkaHeaders.MESSAGE_KEY, "key").setHeader(KafkaHeaders.TOPIC, "test").build(); 

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message); 

方法2

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", "testmsg"); 

主題配置:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test 
Topic:test PartitionCount:3 ReplicationFactor:1 Configs: 
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0 

觀察:

如果有3個消費者,每個分區一個;方法1導致單個消費者從單個分區消耗的所有消息。方法2;消費在三個分區/消費者之間平均分配。

回答

4

但是你在代碼中有一個答案。 第一個和topic一起提供messageKey

messageKey真的是用來確定目標分區,如果沒有明確規定是:

/** 
* computes partition for given record. 
* if the record has partition returns the value otherwise 
* calls configured partitioner class to compute the partition. 
*/ 
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { 
    Integer partition = record.partition(); 
    return partition != null ? 
      partition : 
      partitioner.partition(
        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); 
} 

其中DefaultPartitioner做到這一點:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 
int numPartitions = partitions.size(); 
if (keyBytes == null) { 
    int nextValue = nextValue(topic); 
     ... 
} else { 
    // hash the keyBytes to choose a partition 
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; 
} 

因此,與同key所有郵件都發送到相同的分區。否則,他們被置於主題循環方式。