2017-02-04 31 views
0

我在一個maven項目中製作了一個卡夫卡製作人和消費者。普通春卡夫卡聽衆

我想在另一個maven項目中使用它,所以我添加了上述kafka項目的依賴項。現在問題是生產者是好的,但如何使偵聽器通用,可以被添加此項目的所有其他項目覆蓋。

目前,我有聽衆在一個項目

public class Listener { 
    public CountDownLatch countDownLatch0 = new CountDownLatch(3); 
    public CountDownLatch countDownLatch1 = new CountDownLatch(3); 
    public CountDownLatch countDownLatch2 = new CountDownLatch(3); 

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "0" }) }) 
    public void listenPartition0(ConsumerRecord<?, ?> record) { 
     System.out.println("Listener Id0, Thread ID: " + Thread.currentThread().getId()); 
     System.out.println("Received: " + record); 
     countDownLatch0.countDown(); 
    } 

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "1" }) }) 
    public void listenPartition1(ConsumerRecord<?, ?> record) { 
     System.out.println("Listener Id1, Thread ID: " + Thread.currentThread().getId()); 
     System.out.println("Received: " + record); 
     countDownLatch1.countDown(); 
    } 

    @KafkaListener(id = "id2", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "2" }) }) 
    public void listenPartition2(ConsumerRecord<?, ?> record) { 
     System.out.println("Listener Id2, Thread ID: " + Thread.currentThread().getId()); 
     System.out.println("Received: " + record); 
     countDownLatch2.countDown(); 
    } 

如何讓普通聽衆可以通過其他的項目誰所有添加這個項目作爲依賴,並能聽取他們各自的主題將覆蓋

回答

0

如果我正確理解你的問題,我想你需要做這樣的事情:

在共享的.jar ...

public abstract class Listener { 
    public CountDownLatch countDownLatch0 = new CountDownLatch(3); 
    public CountDownLatch countDownLatch1 = new CountDownLatch(3); 
    public CountDownLatch countDownLatch2 = new CountDownLatch(3); 


    abstract void handlePartition0(record); 
    abstract void handlePartition1(record); 
    abstract void handlePartition2(record); 

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "0" }) }) 
    public void listenPartition0(ConsumerRecord<?, ?> record) { 
    handlePartition0(record); 
    countDownLatch0.countDown(); 
    } 

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "1" }) }) 
    public void listenPartition1(ConsumerRecord<?, ?> record) { 
    handlePartition1(record); 
    countDownLatch1.countDown(); 
    } 

    @KafkaListener(id = "id2", topicPartitions = { @TopicPartition(topic = "SpringKafkaTopic1", partitions = { "2" }) }) 
    public void listenPartition2(ConsumerRecord<?, ?> record) { 
    handlePartition2(record); 
    countDownLatch2.countDown(); 
    } 
} 

然後在子項目中,導入共享的.jar並導入handleXXX方法。

public class MyChildListener extends Listener { 
    public void handlePartition0(Record<?,?> r) { 
    // do something useful 
    } 
    ... 
} 

這可能適合你。

+0

感謝這就是我一直在尋找.. – user1201239