2017-04-12 164 views
0

以下是生產者和消費者的類別。何時能夠生成數據並且無法使用以下代碼進行使用。有人能幫我一下嗎?我在編碼方面做錯了什麼?我的目標是讀取消費者的CustomMessage對象並將數據存儲在數據庫中。如何從生產者消費卡夫卡的消費者?

在我的cmd提示符中,我爲zookeeper打開了5個實例1,爲卡夫卡打開了1個實例,爲生產者打開了1個實例,爲消費者打開了1個實例。我真的不明白。當我運行生產者和消費者類時,是否需要保留所有實例?

任何指針都會非常有幫助。

在此先感謝。

producer class::: 

    package com.kafka.test.demo; 

    import java.io.IOException; 
    import java.util.Properties; 

    import javax.xml.parsers.ParserConfigurationException; 

    import org.apache.kafka.clients.producer.KafkaProducer; 
    import org.apache.kafka.clients.producer.ProducerRecord; 
    import org.xml.sax.SAXException; 

    public class KafkaaProducer { 
     public static void main(String[] args) throws ParserConfigurationException, SAXException, IOException { 
      Properties props = new Properties(); 
//customMessage is a pojo object which should be send to the consumer.. 
      CustomMessage customMessage= new CustomMessage(); 
      customMessage.setMessage("hello kafka"); 
      customMessage.setFan("1234213123"); 
      customMessage.setSourceSystem("Dmap"); 
      customMessage.setStatus("Unenrolled"); 
      customMessage.setMessageTyep("Simple Message"); 
      customMessage.setCreatedTime("5"); 
      customMessage.setProcessedTime("6"); 
      customMessage.setRetryCount("3"); 
      props.put("metadata.broker.list", "localhost:9092"); 
      props.put("serializer.class", "kafka.serializer.StringEncoder"); 
      props.put("request.required.acks", "1"); 
      props.put("bootstrap.servers", "localhost:9092,localhost:9093"); 
      //CustomMessageSerializer 
props.put("key.serializer","com.kafka.test.demo.CustomMessageSerializer"); 
      props.put("value.serializer", "com.kafka.test.demo.CustomMessageSerializer"); 
      try { 
       KafkaProducer<String, CustomMessage> producer = new KafkaProducer<String, CustomMessage>(props); 
       producer.send(new ProducerRecord<String, CustomMessage>("NewMessageTopic", "customMessage",customMessage)); 
       //producer.send(new ProducerRecord<String, CustomMessage>("NewMessageTopic", customMessage)); 
       System.out.println("Message " + "" + " sent !!"); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    consumer class:: 
    package com.kafka.test.demo; 

    import java.net.UnknownHostException; 
    import java.util.Collections; 
    import java.util.Properties; 

    import org.apache.kafka.clients.consumer.ConsumerRecord; 
    import org.apache.kafka.clients.consumer.ConsumerRecords; 
    import org.apache.kafka.clients.consumer.KafkaConsumer; 

    import com.mongodb.BasicDBObject; 
    import com.mongodb.DB; 
    import com.mongodb.DBCollection; 
    import com.mongodb.DBObject; 
    import com.mongodb.MongoClient; 

    public class KafkaaConsumer { 
     public static void main(String[] args) throws InterruptedException { 
      Properties props = new Properties(); 
      props.put("zookeeper.connect", "localhost:2181"); 
      props.put("group.id", "testgroup"); 
      props.put("zookeeper.session.timeout.ms", "4000"); 
      props.put("zookeeper.sync.time.ms", "300"); 
      props.put("rebalance.backoff.ms", "40000"); 
      props.put("bootstrap.servers", "localhost:9092,localhost:9093"); 
      props.put("value.deserializer", "com.kafka.test.demo.CustomMessageDeserializer"); 
      props.put("key.deserializer", "com.kafka.test.demo.CustomMessageDeserializer"); 
      //perisitMessage(); 
      try{ 
       KafkaConsumer<String,CustomMessage> consumer = new KafkaConsumer<String, CustomMessage>(props); 
       consumer.subscribe(Collections.singletonList("NewMessageTopic")); 
       while (true) { 
        ConsumerRecords<String, CustomMessage> messages = consumer.poll(100); 
        for (ConsumerRecord<String, CustomMessage> message : messages) { 
         System.out.println("Message received " + message); 
        } 
        perisitMessage(); 
       } 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 

     private static void perisitMessage() { 
      // TODO Auto-generated method stub 
      CustomMessage customMessage = new CustomMessage(); 
      customMessage.setMessage("hello kafka"); 
      customMessage.setFan("1234213123"); 
      customMessage.setSourceSystem("Dmap"); 
      customMessage.setStatus("Unenrolled"); 
      customMessage.setMessageTyep("Simple Message"); 
      customMessage.setCreatedTime("5"); 
      customMessage.setProcessedTime("6"); 
      customMessage.setRetryCount("3"); 
      try { 
       MongoClient mongoClient = new MongoClient("localhost" , 27017); 
       DB db = mongoClient.getDB("DeviceTrack"); 
       DBCollection msgCollection = db.getCollection("messages"); 
       BasicDBObject document = new BasicDBObject(); 
       document.put("message", customMessage.getMessage()); 
       document.put("fan", customMessage.getFan()); 
       document.put("SourceSystem", customMessage.getSourceSystem()); 
       document.put("RetryCount", customMessage.getRetryCount()); 
       document.put("ProcessedTime", customMessage.getProcessedTime()); 
       document.put("CreatedTime", customMessage.getCreatedTime()); 
       document.put("MessageTyep", customMessage.getMessageTyep()); 
       document.put("Status", customMessage.getStatus()); 
       msgCollection.insert(document); 
       System.out.println("Inserted in the data in DB succesfully"); 

      } catch (UnknownHostException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 
    } 

package com.kafka.test.demo; 

import java.util.Map; 

import org.apache.kafka.common.serialization.Deserializer; 

import com.fasterxml.jackson.databind.ObjectMapper; 

public class CustomMessageDeserializer implements Deserializer { 

    public Object deserialize(String arg0, byte[] arg1) { 
     ObjectMapper mapper = new ObjectMapper(); 
     System.out.println("arg1"+arg1); 
     CustomMessage message = null; 
     try { 
      message = mapper.readValue(arg1, CustomMessage.class); 
     } catch (Exception e) { 

      e.printStackTrace(); 
     } 
     System.out.println(""+message); 
     return message; 
    } 



    public void close() { 
     // TODO Auto-generated method stub 

    } 

    public void configure(Map arg0, boolean arg1) { 
     // TODO Auto-generated method stub 

    } 

} 

    package com.kafka.test.demo; 

    import java.util.Map; 

    import org.apache.kafka.common.serialization.Serializer; 

    import com.fasterxml.jackson.databind.ObjectMapper; 

    public class CustomMessageSerializer implements Serializer { 

     public byte[] serialize(String arg0, Object arg1) { 
      byte[] retVal = null; 
      ObjectMapper objectMapper = new ObjectMapper(); 
      try { 
       retVal = objectMapper.writeValueAsString(arg1).getBytes(); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
      System.out.println("value ::::::"+retVal); 
      return retVal; 
     } 

     public void close() { 
      // TODO Auto-generated method stub 

     } 

     public void configure(Map arg0, boolean arg1) { 
      // TODO Auto-generated method stub 

     } 
    } 

package com.kafka.test.demo; 

public class CustomMessage { 

    private String messageId; 
    private String parentMsgId; 
    private String fan; 
    private String message; 
    private String sourceSystem; 
    private String status; 
    private String messageTyep; 
    private String createdTime; 
    private String processedTime; 
    private String retryCount; 

    /** 
    * @return the messageId 
    */ 
    public String getMessageId() { 
     return messageId; 
    } 
    /** 
    * @param messageId the messageId to set 
    */ 
    public void setMessageId(String messageId) { 
     this.messageId = messageId; 
    } 
    /** 
    * @return the parentMsgId 
    */ 
    public String getParentMsgId() { 
     return parentMsgId; 
    } 
    /** 
    * @param parentMsgId the parentMsgId to set 
    */ 
    public void setParentMsgId(String parentMsgId) { 
     this.parentMsgId = parentMsgId; 
    } 
    /** 
    * @return the fan 
    */ 
    public String getFan() { 
     return fan; 
    } 
    /** 
    * @param fan the fan to set 
    */ 
    public void setFan(String fan) { 
     this.fan = fan; 
    } 
    /** 
    * @return the message 
    */ 
    public String getMessage() { 
     return message; 
    } 
    /** 
    * @param message the message to set 
    */ 
    public void setMessage(String message) { 
     this.message = message; 
    } 
    /** 
    * @return the sourceSystem 
    */ 
    public String getSourceSystem() { 
     return sourceSystem; 
    } 
    /** 
    * @param sourceSystem the sourceSystem to set 
    */ 
    public void setSourceSystem(String sourceSystem) { 
     this.sourceSystem = sourceSystem; 
    } 
    /** 
    * @return the status 
    */ 
    public String getStatus() { 
     return status; 
    } 
    /** 
    * @param status the status to set 
    */ 
    public void setStatus(String status) { 
     this.status = status; 
    } 
    /** 
    * @return the messageTyep 
    */ 
    public String getMessageTyep() { 
     return messageTyep; 
    } 
    /** 
    * @param messageTyep the messageTyep to set 
    */ 
    public void setMessageTyep(String messageTyep) { 
     this.messageTyep = messageTyep; 
    } 
    /** 
    * @return the createdTime 
    */ 
    public String getCreatedTime() { 
     return createdTime; 
    } 
    /** 
    * @param createdTime the createdTime to set 
    */ 
    public void setCreatedTime(String createdTime) { 
     this.createdTime = createdTime; 
    } 
    /** 
    * @return the processedTime 
    */ 
    public String getProcessedTime() { 
     return processedTime; 
    } 
    /** 
    * @param processedTime the processedTime to set 
    */ 
    public void setProcessedTime(String processedTime) { 
     this.processedTime = processedTime; 
    } 
    /** 
    * @return the retryCount 
    */ 
    public String getRetryCount() { 
     return retryCount; 
    } 
    /** 
    * @param retryCount the retryCount to set 
    */ 
    public void setRetryCount(String retryCount) { 
     this.retryCount = retryCount; 
    } 
} 
+0

你使用的是什麼卡夫卡版本。 ps fax |的結果是什麼? grep kafka – divyesh

回答

0

您只需要zookeeper和kafka實例。

  1. 開始動物園管理員
  2. 開始卡夫卡
  3. 創建主題( 「NewMessageTopic」)
  4. 啓動生產者和消費者代碼

如果我理解你的權利u使用的「kafka-控制檯製作人「&」kafka-console-consumer「?他們不需要使用您的Kafka集羣。如果你的代碼工作,這應該很好。如果要通過cmd啓動kafka,你可以寫一個.bat。

:startZK 
echo Zookeeper wird gestartet 
Start "Zookeper" C:\zookeeper-3.4.9\bin\zkServer.cmd 
echo Bitte warten bis Zookeeper gestartet ist. 
pause 
echo Kafka Wird Gestartet 
Start "Kafka" C:\kafka_2.11-0.10.2.0\bin\windows\kafka-server-start.bat C:\kafka_2.11-0.10.2.0\config\server.properties 

goto Top 

在您的代碼先來看看它看的權利。 我不知道你只打印你的system.out數據嗎?

  while (true) { 
       ConsumerRecords<String, CustomMessage> messages = consumer.poll(100); 
       for (ConsumerRecord<String, CustomMessage> message : messages) { 
        System.out.println("Message received " + message);<-- just a syso not more :/ 
       } 
       perisitMessage(); <-- maybe give him the message ? 
      } 

如果你在輪廓上看到你的消息??它工作得很好。如果你今晚看不到任何我可以仔細看看。給我一個提示。但我對MongoDB沒有經驗。