2

故事是這樣的。我有一個kafka經紀人和一個特定的對象(我通過我的主題進行jsonify發送),它有一個我想用作密鑰的ID。Spring Cloud Streams沒有在消息中設置kafka密鑰?

當前我正在使用'partitionKeyExtractorClass'配置來設置提取ID並將其作爲關鍵字返回的類。

它看起來像這樣:

def extractKey(Message<?> message) { 
    log.info('Extracting key from message') 
    String id = new JsonSlurper().parseText(new String(message.payload)).properties.id 
    log.info("Got = ${id}") 

    return id 
} 

我實際的問題是,當我瀏覽這個專題的消息,這對我的消息說,關鍵是空的ConsumerRecord ...

這是一個錯誤?難道我做錯了什麼?關於這方面的文檔並沒有比這更進一步。

回答

1

看,你在混合partitionkey

目前KafkaMessageChannelBinder不提供選項來確定keyMessage

只有現有的功能,你可以用有力的KafkaHeaders.MESSAGE_KEY

Object messageKey = this.messageKeyExpression != null 
      ? this.messageKeyExpression.getValue(this.evaluationContext, message) 
      : message.getHeaders().get(KafkaHeaders.MESSAGE_KEY); 

所以,在output消息,你應該計算密鑰並將其放入這個頭。

+1

這不是一個可接受的解決方案。我有同樣的問題。我使用原始模式輸出。我不想在郵件中添加任何標題,但需要設置關鍵字。我不想切換到Apache駱駝。 –

+0

您的關注尚不明確。這是當前Kafka Binder實現的一個限制,其中mesageKey只是一個省略。儘管如此,請隨時填寫該問題。您可以配置Kafka Binder現在不要映射該標頭,如果您關心這一點。我絕對不明白Apache Camel如何取代Spring Cloud Stream ... –

+1

問題是,除了Spring Cloud Streams之外,我正在將流與其他生產者和消費者進行整合。消息密鑰對於確保在kafka主題上的順序非常重要。 Spring Cloud Streams控制標題意味着所有其他生產者和消費者必須處理Spring Cloud Streams特定標題。我需要kafka消息正是我想要的,而不是Spring Cloud Streams想要的。一個簡單的鍵和字符串消息是我需要的。原始模式給了我,但沒有維護秩序的關鍵。 –

相關問題