2015-07-28 496 views
1

我已經建立了一個卡夫卡服務器與3個經紀人。我想從我的計算機向這三位經紀人發送消息,但我已經爲ngix中的abc.com/kafka1/ abc.com/kafka2/ abc.com/kafka3/等網址配置了每位經紀人。卡夫卡生產者配置metadata.broker.list與url

如何在metadata.broker.list屬性中使用這些網址?我的代碼如下。

package com.xxx.x.kafka.producer; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

import java.util.Date; 
import java.util.Properties; 
import java.util.Random; 


    class TestProducer { 
     public static void main(String[] args) { 
      long events = Long.parseLong(args[0]); 
      Random rnd = new Random(); 

      Properties props = new Properties(); 
      props.put("metadata.broker.list", "abc.com/kafka1/:80,abc.com/kafka2/:80,abc.com/kafka3/:80"); 

      props.put("serializer.class", "kafka.serializer.StringEncoder"); 
      props.put("partitioner.class", "com.knx.adx.kafka.producer.SimplePartitioner"); 
      props.put("request.required.acks", "1"); 

      ProducerConfig config = new ProducerConfig(props); 

      Producer<String, String> producer = new Producer<String, String>(config); 

      for (long nEnvents = 0; nEnvents < events; nEnvents++) { 
       long runtime = new Date().getTime(); 
       String ip = "192.168.2." + rnd.nextInt(255); 
       String msg = runtime + ",www.example.com" + ip; 
       KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); 
       producer.send(data); 
      } 
      producer.close(); 
     } 
    } 

這是我運行我的代碼時得到的錯誤。

Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) 
    at kafka.producer.Producer.send(Producer.scala:77) 
    at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
    at com.knx.adx.kafka.producer.TestProducer.main(TestProducer.java:35) 

line error producer.send(data); 

回答

1

我會說abc.com/kafka1/:80不是正確的語法。我認爲正確的應該是abc.com:9092。

metadata.broker.list屬性中使用的url和port應該由您在Kafka broker server.properties文件中設置的內容(或者您在啓動時設置的任何名稱)確定。

重要的價值觀是:

# The port the socket server listens on 
port=xxx 
# Hostname the broker will bind to. If not set, the server will bind to all interfaces 
#host.name=localhost 

# Hostname the broker will advertise to producers and consumers. If not set, it uses the 
# value for "host.name" if configured. Otherwise, it will use the value returned from 
# java.net.InetAddress.getCanonicalHostName(). 
# advertised.host.name=<hostname routable by clients> 

端口默認是9092,所以如果你用80

希望這有助於檢查這一項。

+0

在服務器中它使用端口9092,但ngix將端口80轉發到外部。 – giaosudau

+0

abc.com/kafka1/:80語法不正確。檢查abc.com:80/kafka1/ – jordi

2

配置變量metadata.broker.list需要host1:port1,host2:port2而不是URL。嘗試爲每個代理配置不同的子域名,如kafka1.abc.com:80,kafka2.abc.com:80,kafka3.abc.com:80,並將這些子域指向適當的主機。請參閱Kafka Configuration的生產者配置部分

這是用於引導的,生產者只會用它來獲取元數據(主題,分區和副本)。發送實際數據的套接字連接將基於元數據中返回的代理信息建立。格式爲host1:port1,host2:port2,列表可以是代理的子集或指向代理子集的VIP。