2017-02-28 65 views
1

我想爲我們的某些人實現數據重放,爲此,我需要使用Kafka保留策略(因爲我使用的是join,並且我需要窗口時間準確)。 P.S.我使用的卡夫卡版本0.10.1.1Kafka保留策略不能按預期工作

我送我的數據轉化爲這樣的話題:

kafkaProducer.send(
        new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r) 
      ); 

我創造我的題目是這樣的:

卡夫卡的話題 - 創建 - -zookeeper localhost:2181 - 複製因子1 --partition 1 - topic myTopic
kafka-topics --zookeeper localhost --alter --topic myTopic --config retention.ms = 172800000 kafka-topics --zookeeper localhost --alter --topic myTopic --config segment.ms = 172800000

因此,通過上述設置,我應該將我的主題的保留時間設置爲48小時。

我擴展TimestampExtractor爲了記錄每條消息的實際時間。

public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor { 
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class); 
    @Override 
    public long extract(ConsumerRecord<Object, Object> consumerRecord) { 
     LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp())); 
     return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis(); 
    } 
} 

對於測試,我發送了4條消息到我的主題,我得到這4條日誌消息。

2017年2月28日10時23分39秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP: 1488295086292人類readble -Tue 2月28日10時18分06秒美國東部時間2017年
2017年2月28日10:24 :01 INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP:14832.72億人readble -Sun 1月1 07:00:00 EST 2017年
2017年2月28日10時26分十一秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP: 1485820800000人類readble -Mon 1月30日19:00:00東部時間2017年
2017-02-28 10點27分22秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP:1488295604411人類readble -Tue 2月28日10點26分44秒美國東部時間2017年

因此,基於​​我希望看到我的兩個中傳遞消息得到清除/刪除5分鐘後(第2次和第3次,因爲它們是1月1日和1月30日)。但我試圖消耗我的話題一個小時,每次我消費我的話題我收到了所有4條消息。

卡夫卡的Avro控制檯消費者--zookeeper本地主機:2181 --from-開始--topic myTopic

我卡夫卡的配置是這樣的:

############################# Log Retention Policy ############################# 

# The following configurations control the disposal of log segments. The policy can 
# be set to delete segments after a period of time, or after a given size has accumulated. 
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 
# from the end of the log. 

# The minimum age of a log file to be eligible for deletion 
log.retention.hours=168 

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining 
# segments don't drop below log.retention.bytes. 
#log.retention.bytes=1073741824 

# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=1073741824 

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 
log.retention.check.interval.ms=300000 

上午我做錯了什麼或者我錯過了什麼?

回答

5

卡夫卡通過刪除日誌段來實現其保留策略。 Kafka永遠不會刪除活動段,這是它將添加發送到分區的新消息的段。卡夫卡只刪除舊的細分市場。卡夫卡輥主動段成舊段當一個新的消息被髮送到的分區,無論是

  • 與新的消息中的活性段的大小將超過log.segment.bytes,或
  • 第一時間戳在活動時間段消息比log.roll.ms以上(默認爲7天)

因此,在你的榜樣,你必須週二2月28日10時18分06秒美國東部時間2017年後要等7天,發送一個新的消息,那麼所有4個初始消息將被刪除。

+0

如果是這樣怎麼解釋當我用1970年的時間戳(非常舊的消息)發送兩條消息後5分鐘都被刪除? – Am1rr3zA