2014-02-12 127 views
15

我正在使用Kafka 0.8.0並嘗試實現下面提到的場景。消費者在Apache Kafka中使用消息的延遲

JCA API(用作生產商和將數據發送到)----->消費------> HBase的

我儘快發送的每個消息,以消費者爲我取使用JCA客戶端的數據。例如,只要生產者發送消息1,我想從消費者那裏獲取相同的信息,並將其放入HBase中。但是我的消費者在一些隨機的n消息之後開始提取消息。我想讓生產者和消費者保持同步,以便他們兩人開始一起工作。

我用:

1經紀人

1單一主題

1單曲製作和高層次的消費

任何人都可以建議我需要什麼做到這一點嗎?

編輯:

添加一些相關的代碼片段。

Consumer.java

public class Consumer extends Thread { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    PrintWriter pw = null; 
    int t = 0; 
    StringDecoder kd = new StringDecoder(null); 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    Map<String, List<KafkaStream<String, Signal>>> consumerMap; 
    KafkaStream<String, Signal> stream; 
    ConsumerIterator<String, Signal> it; 

    public Consumer(String topic) { 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 

     this.topic = topic; 
     topicCountMap.put(topic, new Integer(1)); 
     consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
       new VerifiableProperties())); 
     stream = consumerMap.get(topic).get(0); 
     it = stream.iterator(); 

    } 

    private static ConsumerConfig createConsumerConfig() { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", KafkaProperties.zkConnect); 
     props.put("group.id", KafkaProperties.groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("fetch.size", "1024"); 

     return new ConsumerConfig(props); 

    } 

    synchronized public void run() { 

     while (it.hasNext()) { 
      t = (it.next().message()).getChannelid(); 
      System.out.println("In Consumer received msg" + t); 
     } 
    } 
} 

producer.java

public class Producer { 
    public final kafka.javaapi.producer.Producer<String, Signal> producer; 
    private final String topic; 
    private final Properties props = new Properties(); 

    public Producer(String topic) 
    { 
     props.put("serializer.class", "org.bigdata.kafka.Serializer"); 
     props.put("key.serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("metadata.broker.list", "localhost:9092"); 
     // Use random partitioner. Don't need the key type. Just set it to Integer. 
     // The message is of type userdefined Object . 
     producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props)); 
     this.topic = topic; 
    } 
} 

KafkaProperties.java

public interface KafkaProperties { 
    final static String zkConnect = "127.0.0.1:2181"; 
    final static String groupId = "group1"; 
    final static String topic = "test00"; 
    final static String kafkaServerURL = "localhost"; 
    final static int kafkaServerPort = 9092; 
    final static int kafkaProducerBufferSize = 64 * 1024; 
    final static int connectionTimeOut = 100000; 
    final static int reconnectInterval = 10000; 
    final static String clientId = "SimpleConsumerDemoClient"; 
} 

這是消費者如何處理前10條消息,而不是消費者收到消息的系統消息,但從第11條消息開始,它開始正常工作。

 producer sending msg1 

    producer sending msg2 

    producer sending msg3 

    producer sending msg4 

    producer sending msg5 

    producer sending msg6 

    producer sending msg7 

    producer sending msg8 

    producer sending msg9 

    producer sending msg10 

    producer sending msg11 

    producer sending msg12 
    In Consumer received msg12 

    producer sending msg13 
    In Consumer received msg13 

    producer sending msg14 
    In Consumer received msg14 

    producer sending msg15 
    In Consumer received msg15 

    producer sending msg16 
    In Consumer received msg16 

    producer sending msg17 
    In Consumer received msg17 

    producer sending msg18 
    In Consumer received msg18 

    producer sending msg19 
    In Consumer received msg19 

    producer sending msg20 
    In Consumer received msg20 

    producer sending msg21 
    In Consumer received msg21 

EDITED:將其中生產者將消息發送到消費者的收聽者的功能。而我使用的是默認的配置生產者沒有覆蓋它

public synchronized void onValueChanged(final MonitorEvent event_) { 


    // Get the value from the DBR 
    try { 
     final DBR dbr = event_.getDBR(); 

     final String[] val = (String[]) dbr.getValue(); 

     producer1.producer.send(new KeyedMessage<String, Signal>   
        (KafkaProperties.topic,new Signal(messageNo))); 
     System.out.println("producer sending msg"+messageNo); 

     messageNo++; 


    } catch (Exception ex) { 
     ex.printStackTrace(); 
    } 
} 
+0

你能顯示你的生產者和消費者代碼/配置嗎?看起來他們中的一些人使用批量操作(事實上這是好事)。 – Dmitry

+0

@Dmitry添加了代碼片段。 – Ankita

+0

消費者似乎沒問題(財產fetch.size = 1K除外 - 這意味着消費者無法收到更大的消息,但可能不是我們正在尋找的問題)。你可以分享生產者的newProducerConfig()和run()方法的代碼嗎? – Dmitry

回答

8
  1. 嘗試添加props.put("request.required.acks", "1")生產者配置。默認情況下,製作者不會等待確認,並且不保證消息傳送。因此,如果您在測試之前啓動代理,生產商可能會在代理完全初始化之前開始發送消息,並且可能會丟失前幾個消息。

  2. 嘗試將props.put("auto.offset.reset", "smallest")添加到消費者配置中。它等於kafka-console-consumer.sh的--from-beginning選項。如果您的消費者晚於生產者啓動,且Zookeeper中沒有保存偏移數據,則默認情況下它將開始僅使用新消息(請參閱文檔中的Consumer configs)。

+0

感謝您的建議。向生產者添加了props.put(「request.required.acks」,「1」),但程序的行爲是隨機的。我每次用一個新主題運行程序5次。但是,這五次都給出了不同的結果。生產者和消費者在消費者推遲的其餘時間裏同步。 – Ankita

+0

通過'延遲'你的意思是所有的郵件都收到了,但不是在發送後立即發送?在原始輸出中,首先幾條消息完全丟失。 – Dmitry

+0

是的,實際上有兩種情況: 1)有時候所有的消息都是收到的,但不是在發送後立即發送。 2)其他時間,如提供的輸出中所示,丟失了幾條消息。 但是,當我從控制檯運行此命令「bin/kafka-console-consumer.sh --zookeeper localhost:2181 - topic topicname --from-beginning」時,我在消費者中獲得與生產者生成的消息相同數量的消息。 爲什麼會這樣? – Ankita

0

這可能是由於更多的分區比沒有消費者。檢查主題是否僅使用單個分區創建,然後您不會錯過消費者中的任何消息。