2017-01-22 328 views
1

我試圖用加特林與卡夫卡,但每隔一段時間,出現此錯誤:卡夫卡錯誤:SLF4J:失敗的toString()類型的對象上調用[org.apache.kafka.common.Cluster]

01:32:53.933 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=12,client_id=producer-1}, body={topics=[test]})) to node 1011 
SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.common.Cluster] 
java.lang.NullPointerException 
at org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:72) 
    at java.lang.String.valueOf(String.java:2994) 
    at java.lang.StringBuilder.append(StringBuilder.java:131) 
    at java.util.AbstractCollection.toString(AbstractCollection.java:462) 
    at java.lang.String.valueOf(String.java:2994) 
    at java.lang.StringBuilder.append(StringBuilder.java:131) 
    at org.apache.kafka.common.Cluster.toString(Cluster.java:151) 
    at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:305) 
    at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:277) 
    at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:231) 
    at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:298) 
    at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:208) 
    at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:212) 
    at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:103) 
    at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:88) 
    at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:48) 
    at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:273) 
    at ch.qos.logback.classic.Logger.callAppenders(Logger.java:260) 
    at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:442) 
    at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:433) 
    at ch.qos.logback.classic.Logger.debug(Logger.java:511) 
    at org.apache.kafka.clients.producer.internals.Metadata.update(Metadata.java:133) 
    at org.apache.kafka.clients.NetworkClient.handleMetadataResponse(NetworkClient.java:313) 
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:298) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:199) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) 
    at java.lang.Thread.run(Thread.java:745) 
01:32:53.937 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.producer.internals.Metadata - Updated cluster metadata version 14 to [FAILED toString()] 

我不知道,如果錯誤與代碼,但這裏是我BasicSimulation.scala:

class BasicSimulation extends Simulation { 

val kafkaConf = kafka 
    .topic("test") 
    .properties(
     Map(
     ProducerConfig.ACKS_CONFIG -> "1", 
     ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092", 
     ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
      "org.apache.kafka.common.serialization.ByteArraySerializer", 
     ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
      "org.apache.kafka.common.serialization.ByteArraySerializer")) 

    val scn = scenario("Kafka Test") 
    .feed(csv("data.csv").circular) 
    .exec(kafka("request") 
    .send("${data}".getBytes: Array[Byte])) 

    setUp(
    scn 
     .inject(constantUsersPerSec(10) during(10 seconds))) 
    .protocols(kafkaConf) 
} 

這裏是我的泊塢窗,compose.yml的卡夫卡相關部分:

zookeeper: 
    image: wurstmeister/zookeeper 
    ports: 
     - "2181:2181" 

    kafka: 
    image: wurstmeister/kafka 
    ports: 
     - "9092:9092" 
    depends_on: 
     - zookeeper 
    environment: 
     KAFKA_ADVERTISED_HOST_NAME: kafka 
     KAFKA_ADVERTISED_PORT: 9092 
     KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 
     KAFKA_CREATE_TOPICS: "test:1:1" 
     KAFKA_LOG_CLEANER_ENABLE: 'true' 
    volumes: 
     - /tmp/docker.sock:/var/run/docker.sock 

回答

0

停止和刪除我的碼頭集裝箱似乎已經修復它。

docker stop $(docker ps -a -q) 
docker rm $(docker ps -a -q) 
1

在我來說,這是一個版本的問題。我正在使用不兼容的kafka構建和kafka客戶端庫。

當我切換到kafka_2.11-0.10.2.1和kafka-clients 0.10.2.0時,問題已解決。

0

這是Kafka依賴版本問題。 安裝的Kafka版本和Kafka客戶端依賴版本被誤認爲。首先驗證已安裝的Kafaka版本並更新POM中的Kafka客戶端版本。我面臨同樣的問題,並通過更正kafka-client版本來解決問題。

安裝KAFKA版本:0.11

Maven Dependency: 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.11.0.0</version> 
     </dependency> 

現在,它的工作對我罰款。

相關問題