1
我有一個應用程序應該發送有限數量的消息給卡夫卡,然後退出。出於某種原因,即使我關閉了製作人,卡夫卡連接仍然保持着。我實現(斯卡拉)或多或少關閉卡夫卡連接
object Kafka {
private val props = new Properties()
props.put("compression.codec", DefaultCompressionCodec.codec.toString)
props.put("producer.type", "sync")
props.put("metadata.broker.list", "localhost:9092")
props.put("batch.num.messages", "200")
props.put("message.send.max.retries", "3")
props.put("request.required.acks", "-1")
props.put("client.id", "myclient")
private val producer = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(props))
private def encode(msg: Message) = new KeyedMessage("topic", msg.id.getBytes, write(msg).getBytes)
def send(msg: Message) = Try(producer.send(encode(msg)))
def close() = producer.close()
}
這裏Message
是一個簡單的案例類,以及如何我將其轉換爲字節數組是不是真的有關。
消息確實到達,但是當我最終調用Kafka.close()
時,應用程序不會退出,並且連接似乎不會被釋放。
有沒有辦法明確要求卡夫卡終止連接?
很抱歉,如果它是不明確的。正如我所說,我**在完成時調用'Kafka.close()',然後調用'producer.close()'。但仍然保持連接狀態 – Andrea 2014-11-06 15:47:29
您認爲連接處於開放狀態?還有哪個版本的卡夫卡? – 2014-11-06 17:25:53
該版本是Kafka 0.8.1.1。至於卡夫卡保持開放,我只是看到程序沒有終止。你知道如何檢查Kafka相關的東西是否仍然有效?也許使用jvisualvm來看看線程 – Andrea 2014-11-06 17:52:58