2017-12-18 365 views
0

這是我的路由生成器。在這裏,我試圖從我的文件插入數據topic.Later,我通過我的主要方法,並使用駱駝上下文我運行它。我嘗試了幾個代碼,但沒有人幫助我。我正在開發Apache kafka的POC - 駱駝。Apache Kafka + Apache Camel集成+ POC問題java.util.Hashtable.put上的java.lang.NullPointerException(Hashtable.java:459)

public class SimpleRouteBuilder extends RouteBuilder { 

    @Override 
    public void configure() throws Exception { 

     String topicName = "test120"; 
     String kafkaServer = "kafka:localhost:9092"; 
     String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181"; 
     String serializerClass = "serializerClass=kafka.serializer.StringEncoder"; 

     String toKafka = "kafka:localhost:9092?topic=test120;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1"; 


    // toKafka = new StringBuilder().append("&").append(serializerClass).toString(); 

       /*new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&") 
       .append(zooKeeperHost).append("&").append(serializerClass).toString();*/ 

     from("file:C:/inbox?noop=true").to(toKafka); 


    } 
} 

這是我的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>HelloWorld</groupId> 
    <artifactId>Pallavi</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
    <dependencies> 
     <dependency> 
      <groupId>org.apache.camel</groupId> 
      <artifactId>camel-core</artifactId> 
      <version>2.20.1</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.camel</groupId> 
      <artifactId>camel-kafka</artifactId> 
      <version>2.20.1</version> 
     </dependency> 

     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.11.0</version> 
     </dependency> 
    </dependencies> 

</project> 

這是我的主類:

import org.apache.camel.CamelContext; 
import org.apache.camel.impl.DefaultCamelContext; 

public class MainApp { 

    public static void main(String[] args) { 
     SimpleRouteBuilder routeBuilder = new SimpleRouteBuilder(); 
     CamelContext ctx = new DefaultCamelContext(); 
     try { 
      ctx.addRoutes(routeBuilder); 
      ctx.start(); 
      Thread.sleep(5 * 60 * 1000); 
      ctx.stop(); 
      System.out.println("hi i am working"); 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 
     } 

    } 
} 

錯誤是:

java.lang.NullPointerException 
    at java.util.Hashtable.put(Hashtable.java:459) 
    at org.apache.camel.component.kafka.KafkaProducer.getProps(KafkaProducer.java:63) 
    at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:89) 
    at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61) 
    at org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:75) 
    at org.apache.camel.impl.DeferServiceStartupListener.onCamelContextStarted(DeferServiceStartupListener.java:49) 
    at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3859) 
    at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3638) 
    at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3490) 
    at org.apache.camel.impl.DefaultCamelContext.access$000(DefaultCamelContext.java:208) 
    at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3249) 
    at org.apache.camel.impl.DefaultCamelContext$2.call(DefaultCamelContext.java:3245) 
    at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3268) 
    at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3245) 
    at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61) 
    at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3168) 
    at demo.MainApp.main(MainApp.java:13) 
Picked up _JAVA_OPTIONS: -Xmx512M -Xms512M 
+0

可能重複[什麼是NullPointerException,以及如何解決它?](https://stackoverflow.com/questions/218384/what-is-a-nullpointerexception-and-how-doi-i-fix -it) –

+0

您尚未配置代理選項 –

回答

0

您需要配置brokers選項在端點上。或者在全球的kafka組件上。

我已經登錄了票,使駱駝在報告在下一個版本一個更好的異常:https://issues.apache.org/jira/browse/CAMEL-12090

這裏有一個例子卡夫卡,你可以看看太:https://github.com/apache/camel/tree/master/examples/camel-example-kafka

+0

我試過github代碼,我收到此錯誤:org.apache.kafka.common.errors.TimeoutException:無法在60000毫秒後更新元數據。 –

0

也許你沒有設置生產性能其中有爲卡夫卡製作人提供的配置。

駱駝卡夫卡組件需要配置強制卡夫卡生產者屬性。

0

確保目錄C:\ inbox確實存在。此外,請確保您更改POM文件在本教程中提到的那些依賴的版本,因爲駱駝卡夫卡2.17.0事情已經改變,它不再接受zookeeperHostzookeeperPort值在到Kafka字符串中。