2017-08-25 292 views
0

我正在嘗試編寫java api來創建kafka主題。我有Kafka版本0.11.0.0。我搜索堆棧溢出並嘗試了相同的方式。但無論話題是否存在,它總是讓我異常。使用java創建主題 - kafka版本> 0.10.0.0

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/kafka/common/errors/TopicExistsException 
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala) 
at kafkaStream.Processor.CreateTopic.main(CreateTopic.java:65) 
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.errors.TopicExistsException 
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 

我嘗試下面的代碼:

  String topicName = "t5"; 
      String zookeeperHosts = "XXXX:2181,XXXX:2181"; 
      int sessionTimeOutInMs = 15 * 1000; 
      int connectionTimeOutInMs = 50 * 1000; 
      Properties topicConfig = new Properties(); 
      zkClient = new ZkClient("XXXX:2181,XXXX:2181", 
        sessionTimeOutInMs, 
        connectionTimeOutInMs, 
        ZKStringSerializer$.MODULE$); 
      zkUtils = new ZkUtils(zkClient, new 
      ZkConnection(zookeeperHosts), false); 
      ZkUtils.apply(
        "XXXX:2181,XXXX:2181", 
        sessionTimeOutInMs, 
        connectionTimeOutInMs, 
        false); 
     //  AdminUtils.createTopic(zkUtils, topicName, numPartitions, 1, 
      topicConfig, RackAwareMode.Enforced$.MODULE$); 
      AdminUtils.createTopic(zkUtils, topicName, 2, 1, new 
      Properties(), RackAwareMode.Enforced$.MODULE$); 

Maven依賴 - >

 <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.11.0.0</version> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.11.0.0</version> 
     </dependency> 
+0

你是否添加了依賴api(kafka-特定版本的客戶端)jar到類路徑? –

+0

是的,我已經添加 – Megha

回答

1

由於您使用的是新版本0.11.0我建議使用新的管理客戶端API(以下鏈接爲一些文件:https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html)。

使用Zookeeper進行此類操作在將來會推遲使用新的Admin Client API。

+0

感謝您的建議,我試着用AdminClient。我無法做到。你可以分享任何例子 – Megha

+0

你可以在這裏找到一個非常簡單的例子(你只需要kafka-clients依賴,請參閱相關的pom.xml):https://github.com/ppatierno/kafka-playground/blob/ master/src/main/java/org/apache/kafka/playground/CreateTopic.java – ppatierno

+0

非常感謝@ppatierno,我遇到了導入錯誤軟件包的問題。非常感謝 – Megha