2017-07-21 61 views
-1

我有一個要求,要證明kafka生產者可以生成100萬條 消息/秒給Kafka集羣,然後評估其性能。用kafka生產100萬條消息/秒

我怎樣才能達到100萬條信息/秒?

J'AI UNE選擇COMME CA倒UNE生產卡夫卡:

public static void main(String args[]){ 

    Random rnd = new Random(); 

    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    Producer<String, String> producer = new KafkaProducer<String, String>(props); 
    int counter=0; 
    int i = 0; 
    while (true){ 
     TimeZone tz = TimeZone.getTimeZone("UTC"); 
     DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm.sss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset 
     df.setTimeZone(tz); 
     String nowAsISO = df.format(new Date()); 
     ++counter; 

      final String message = "sensor" + i + ":" + Integer.toString(rnd.nextInt(10000)) + " " + String.valueOf(rnd.nextDouble())+ " " + 
        "MyDevice" + " " + 
        "Sensor" + " " + "Sensing" + " " + "Property" + " " + "Unit" + " " + "9845A" + " " + nowAsISO ; 


     try { 
      producer.send(new ProducerRecord<String, String>("test", message), new Callback() { 
         public void onCompletion(RecordMetadata recordMetadata, Exception e) { 
          if (e == null){ 
           System.out.println("Partition: "+recordMetadata.partition() 
             +", Offset" + recordMetadata.offset() 
             + ", timestamp: " + recordMetadata.timestamp()); 
           System.out.println(message); 
          } 
          else { 
           e.printStackTrace(); 
          } 
         } 
        } 
      ); 

      i++; 
      TimeUnit.SECONDS.sleep(1000); 
     } 
     catch (InterruptedException e ){ 
      System.out.println("I was interrupted."); 
     } 
    } 

} 

}

謝謝!

+0

你可以在一個while循環內發佈一個常量字符串一百萬次,在循環之後刷新生產者並且將其刪除。具有TimeUnit.SECONDS.sleep(1000)的 –

+1

是自我挫敗。在while循環之後我會說flush,並且在i ++ <1mil上預測循環,並且執行post和pre循環時間的時間差。 –

+1

打印到控制檯也很慢 –

回答