我已經建立了一個卡夫卡服務器與3個經紀人。我想從我的計算機向這三位經紀人發送消息,但我已經爲ngix
中的abc.com/kafka1/ abc.com/kafka2/ abc.com/kafka3/
等網址配置了每位經紀人。卡夫卡生產者配置metadata.broker.list與url
如何在metadata.broker.list
屬性中使用這些網址?我的代碼如下。
package com.xxx.x.kafka.producer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "abc.com/kafka1/:80,abc.com/kafka2/:80,abc.com/kafka3/:80");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.knx.adx.kafka.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEnvents = 0; nEnvents < events; nEnvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com" + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
這是我運行我的代碼時得到的錯誤。
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.knx.adx.kafka.producer.TestProducer.main(TestProducer.java:35)
line error producer.send(data);
在服務器中它使用端口9092,但ngix將端口80轉發到外部。 – giaosudau
abc.com/kafka1/:80語法不正確。檢查abc.com:80/kafka1/ – jordi