2014-09-03 119 views
2

我試圖測試與卡夫卡駱駝整合爲解釋hereApache的駱駝和卡夫卡整合

以下是我的代碼

public class KafkaTest { 

    public static void main(String args[]) throws Exception { 
     CamelContext context = new DefaultCamelContext(); 

     context.addRoutes(new RouteBuilder() { 
      public void configure() { 
       from("kafka:test?zkConnect=localhost:2181&metadataBrokerList=localhost:9092") 
       .process(new Processor() { 
        @Override 
        public void process(Exchange exchange) throws Exception { 
         System.out.println(exchange.getIn().getBody()); 
        } 
       }) 
       .end(); 
      } 
     }); 

     context.start(); 
     while (true) { 

     } 
    } 
} 

不過,我收到以下錯誤

Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route1: Route(route1)[[From[kafka:test?zkConnect=localhost:2181&... because of Failed to resolve endpoint: kafka://test?amp%3BmetadataBrokerList=localhost%3A9092&zkConnect=localhost%3A2181 due to: Failed to resolve endpoint: kafka://test?amp%3BmetadataBrokerList=localhost%3A9092&zkConnect=localhost%3A2181 due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. 

Unknown parameters=[{metadataBrokerList=localhost:9092, zkConnect=localhost:2181}] 

請建議可能缺少的內容。

回答

1

您應該使用在official documentation中命名的正確參數名稱。

from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181") 

您引用的版本,在github的wiki中描述,是Apache的貢獻,之後有所改變。

+0

有2駱駝卡夫卡組件漂浮在那裏。 https://github.com/Giwi/camel-kafka => 3歲。使用OP正在使用的格式。 https://github.com/apache/camel/tree/master/components/camel-kafka => CURRENT使用此答案中表示的參數。 – Jeff 2015-09-11 14:36:34

0

使用端點類?

類似:

public static KafkaEndpoint endpoint(String host, String port, String topic, String offset, String groupId) { 
     String endpointUri = "kafka://" + host + ":" + port; 
     KafkaEndpoint endpoint = new DefaultCamelContext().getEndpoint(endpointUri, KafkaEndpoint.class); 
     endpoint.getConfiguration().setTopic(topic); 
     endpoint.getConfiguration().setKeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"); 
     endpoint.getConfiguration().setValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer"); 
     endpoint.getConfiguration().setAutoOffsetReset(offset); 
     endpoint.getConfiguration().setGroupId(groupId); 
     return endpoint; 
    } 

PollingConsumer consumer = endpoint.createPollingConsumer(); 

new RouteBuilder() { 
      public void configure() { 
       from(endpoint) 
       .process(new Processor() { 
        @Override 
        public void process(Exchange exchange) throws Exception { 
         System.out.println(exchange.getIn().getBody()); 
        } 
       }) 
       .end(); 
      } 
     }