2016-12-14 89 views
0

是否有一個開始從一個特定的消費者使用我們傳遞卡夫卡時光倒流使用偏移

我知道有props.put(「auto.offset.reset」的初始屬性偏移的方式, 「最早」),但這讓我開始。

不過,我想回去如下

方案1.指定的偏移量,我想開始在 方案2.指定我要開始

的時間,我想這樣做使用初始屬性作爲首選選項。 如果這是不可能的,那麼使用一些其他的機制

附上我的簡單的消費代碼,以供參考

import java.util.Arrays; 
import java.util.Properties; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 

public class SimpleConsumer { 

    public static void main(String[] args) throws Exception { 

     String topicName = "test3"; 
     Properties props = new Properties(); 

     String groupId = "single"; 

     // Kafka consumer configuration settings 
     props.put("bootstrap.servers", "mymachine:9092"); 
     props.put("group.id", groupId); 
     props.put("enable.auto.commit", "true"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("session.timeout.ms", "30000"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("auto.offset.reset", "earliest"); 

     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
     consumer.subscribe(Arrays.asList(topicName)); 

     System.out.println("Starting the _NON-BATCH_ consumer ::: Topic=" + topicName+" GroupId="+groupId); 

     while (true) { 
      ConsumerRecords<String, String> records = consumer.poll(100); 
      for (ConsumerRecord<String, String> record : records) { 
       System.out.printf("%s (offset:%d, key:%s, partition = %s, topic = %s)", record.value(), record.offset(), record.key(), record.partition(), record.topic()); 
       System.out.println(); 
      } 
     } 
    } 
} 
+0

你使用哪個版本的kafka? – C4stor

回答

1

對於方案1,您可以使用KafkaConsumer.seek(TopicPartition,偏移)指定的偏移量你讀。

對於場景2,Kafka 0.10.1.0提供了KafkaConsumer.offsetsForTimes方法,允許您通過時間戳查找給定分區的偏移量,然後調用seek()方法來檢索所需的消息。