我正在與Kafka合作,並嘗試通過關注此article來設置消費者羣組。唯一的區別是我創建了自己的抽象類,處理器使設計更簡單。來自kafka消費者的InstanceAlreadyExistsException
下面是我的抽象類:
public abstract class Consumer implements Runnable {
private final Properties consumerProps;
private final String consumerName;
public Consumer(String consumerName, Properties consumerProps) {
this.consumerName = consumerName;
this.consumerProps = consumerProps;
}
protected abstract void shutdown();
protected abstract void run(String consumerName, Properties consumerProps);
@Override
public final void run() {
run(consumerName, consumerProps);
}
}
下面是我KafkaConsumerA
延伸上面抽象類:
public class KafkaConsumerA extends Consumer {
private KafkaConsumer<byte[], DataHolder> consumer;
public KafkaConsumerA(String consumerName, Properties consumerProps) {
super(consumerName, consumerProps);
}
@Override
public void shutdown() {
consumer.wakeup();
}
@Override
protected void run(String consumerName, Properties consumerProps) {
// exception comes from below line from two of the threads and the remaining one thread works fine.
consumer = new KafkaConsumer<>(consumerProps);
List<String> topics = getTopicsBasisOnConsumerName(consumerName);
try {
consumer.subscribe(topics);
// Setup the schema config
Map<String, Object> config = new HashMap<>();
config.put("urls", "https://abc.qa.host.com");
GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
while (true) {
ConsumerRecords<byte[], DataHolder> records = consumer.poll(200);
for (ConsumerRecord<byte[], DataHolder> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out
.println((Thread.currentThread().getId() % 3) + 1 + ": " + decoder.decode(record.value()));
}
}
} catch (WakeupException ex) {
ex.printStackTrace();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
consumer.close();
}
}
}
,下面將我的處理程序類:
// looks like something is wrong in this class
public final class ConsumerHandler {
private final ExecutorService executorServiceProcess;
private final Consumer consumer;
private final List<Consumer> consumers = new ArrayList<>();
public ConsumerHandler(Consumer consumer, int poolSize) {
this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
this.consumer = consumer;
for (int i = 0; i < poolSize; i++) {
consumers.add(consumer);
executorServiceProcess.submit(consumer);
}
}
public void shutdown() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Consumer consumer : consumers) {
consumer.shutdown();
}
executorServiceProcess.shutdown();
try {
executorServiceProcess.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
}
}
這裏我是從主類的消費者羣體中啓動我的所有消費者:
public static void main(String[] args) {
ConsumerHandler handlerA =
new ConsumerHandler(new KafkaConsumerA("KafkaConsumerA", getConsumerProps()), 3);
// run KafkaConsumerB here
handlerA.shutdown();
// shutdown KafkaConsumerB here
}
所以用這個 - 我的計劃是建立一個消費羣體有三個消費者KafkaConsumerA
和所有三個訂閱相同的主題。
錯誤: -
每當我跑這個,貌似只有一個消費羣體在消費工作和其他兩名不起作用。我從控制檯上看到這個例外:
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=a97716e0-0e05-4938-8fa1-6b872cf24e34
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_79]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_79]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) ~[kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) [kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) [kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) [kafka-clients-0.10.0.0-SASL.jar:na]
我在這裏做什麼? getConsumerProps()
方法返回屬性對象,其中client.id
和group.id
與該消費者組中的所有三個消費者具有相同的值。
下面是我的設計細節:
- 我
KafkaConsumerA
將有3名消費者在消費羣體和每一個消費者將在topicA
工作。 - 我的
KafkaConsumerB
(與KafkaConsumerA類似)將有兩個消費者在不同的消費羣體中,並且這些消費者中的每一個都將在topicB
上工作。
而這兩個消費者KafkaConsumerA
和KafkaConsumerB
將運行在相互獨立的不同消費者組的相同盒子上。
我明白了。謝謝(你的)信息。我會試試看。另外,你認爲我的設計和邏輯是正確的,我正在嘗試做什麼。我在我的問題的底部提到了它。或者你看到任何可以完成的設計改進? – user1950349
這是一個非常合理的設置,但我當然不知道你的域名。 –
我的想法是在同一個JVM上運行多個kafka使用者。每個卡夫卡消費者都有多個線程。例如:'KafkaConsumerA'將有三個使用者在一個使用topicA的消費羣中。 'KafkaConsumerB'將有2個消費者在另一個使用'topicB'的消費羣體中。這兩個'KafkaConsumerA'和'KafkaConsumerB'都將在單個JVM上運行。 – user1950349