2014-11-06 173 views
1

我有一個應用程序應該發送有限數量的消息給卡夫卡,然後退出。出於某種原因,即使我關閉了製作人,卡夫卡連接仍然保持着。我實現(斯卡拉)或多或少關閉卡夫卡連接

object Kafka { 
    private val props = new Properties() 

    props.put("compression.codec", DefaultCompressionCodec.codec.toString) 
    props.put("producer.type", "sync") 
    props.put("metadata.broker.list", "localhost:9092") 
    props.put("batch.num.messages", "200") 
    props.put("message.send.max.retries", "3") 
    props.put("request.required.acks", "-1") 
    props.put("client.id", "myclient") 

    private val producer = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(props)) 
    private def encode(msg: Message) = new KeyedMessage("topic", msg.id.getBytes, write(msg).getBytes) 

    def send(msg: Message) = Try(producer.send(encode(msg))) 
    def close() = producer.close() 
} 

這裏Message是一個簡單的案例類,以及如何我將其轉換爲字節數組是不是真的有關。

消息確實到達,但是當我最終調用Kafka.close()時,應用程序不會退出,並且連接似乎不會被釋放。

有沒有辦法明確要求卡夫卡終止連接?

回答

1

高清的close()= producer.close()

這將創建一個名爲 「關閉」 功能調用producer.close() 我看不出有什麼證據證明你的代碼實際上關閉生產。

您需要只要致電: producer.close

+0

很抱歉,如果它是不明確的。正如我所說,我**在完成時調用'Kafka.close()',然後調用'producer.close()'。但仍然保持連接狀態 – Andrea 2014-11-06 15:47:29

+0

您認爲連接處於開放狀態?還有哪個版本的卡夫卡? – 2014-11-06 17:25:53

+0

該版本是Kafka 0.8.1.1。至於卡夫卡保持開放,我只是看到程序沒有終止。你知道如何檢查Kafka相關的東西是否仍然有效?也許使用jvisualvm來看看線程 – Andrea 2014-11-06 17:52:58