2017-01-23 67 views
2

我有2個數據流,我希望能夠加入他們1個月的窗口,讓我們說。當我有一個實時數據,一切都是樂趣和超級容易KStream加入。我做了這樣的事情;卡夫卡流與JoinWindow工作進行數據回放

KStream<String, GenericRecord> stream1 = 
      builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1()); 

KStream<String, GenericRecord> stream2 = 
      builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic2()); 

long joinWindowSizeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days 

    KStream<String, GenericRecord> joinStream = stream1.join(stream2, 
      new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() { 
       @Override 
       public GenericRecord apply(GenericRecord genericRecord, GenericRecord genericRecord2) { 
        final GenericRecord jonnedRecord = new GenericData.Record(jonnedRecordSchema); 
        .... 
        .... 
        .... 

        return jonnedRecord; 
       } 
      }, JoinWindows.of(joinWindowSizeMs)); 

當我想要進行數據重播時出現問題。假設我想重新執行這些連接以獲得過去6個月的數據,因爲我一次運行所有數據的管道,kafkaStream將連接所有可連接的數據,並且不考慮時間差異它應該只加入過去一個月的數據)。我假設JoinWindow時間是我們將數據插入到Kafka主題中的時間,對嗎?
以及我如何改變和操縱這個時間,所以我可以運行我的數據重播正確,我的意思是重新插入這些過去的6個月的數據應該需要一個月的時間之窗,每一個相應的記錄,並加入基於一個。

這個問題是不能重複的How to manage Kafka KStream to Kstream windowed join?,還有我問我怎麼能可以根據時間窗口上加入。這裏我正在談論數據重放。根據我的理解,在加入Kafka時需要將數據作爲JoinWindow的時間插入到主題中,因此,如果您想要執行數據重放並在6個月前重新插入數據,則kafka會將其作爲新數據今天插入,並將與其他一些實際上適用於今天的其他數據一起加入。

+0

[如何管理Kafka KStream到Kstream窗口連接?](http://stackoverflow.com/questions/41707854/how-to-manage-kafka-kstream-to-kstream-windowed-join) –

+0

@ MatthiasJ.Sax不是,這個問題完全不同,我已經編輯了一些問題。 – Am1rr3zA

+0

恢復了我的近距離投票。 –

回答

3

卡夫卡的流API使用由TimestampExtractor返回時間戳來計算聯接。默認情況下,這是記錄的嵌入式元數據時間戳。 (c.f. http://docs.confluent.io/current/streams/concepts.html#time

默認情況下,KafkaProducer將此時間戳設置爲寫入時的當前系統時間。 (作爲替代方案,你可以在每個話題的基礎在券商存儲記錄的時間覆蓋與經紀人的系統時間記錄生產者提供的時間戳配置代理 - 這提供「攝取時間」的語義。)

因此,它本身不是Kafka Streams問題。

可以有多種選擇來解決這個問題:

  1. 如果您的數據已經是一個主題,你可以簡單地重置您的Streams應用程序重新處理舊數據。爲此,您可以使用應用程序重置工具(bin/kafka-streams-application-reset.sh)。您還需要在Streams應用中指定auto.offset.reset策略爲earliest。查看文檔 - 另外,建議閱讀博文。

這是最好的辦法,因爲你不需要再次寫入數據的話題。

  • 如果你的數據不是一個主題,你需要寫數據,你可以在應用程序級別明確設置記錄的時間戳,通過爲每個記錄提供的時間戳:
  • KafkaProducer producer = new KafkaProducer(...); 
    producer.send(new ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)); 
    

    因此,如果你攝取的舊數據可以明確地設置時間戳和卡夫卡流將它撿起來,並計算相應的加入。

    +0

    因爲在數據重放期間,我需要將數據重新插入到我需要使用第二種方法的主題中。我會嘗試你的解決方案,看起來很棒。 – Am1rr3zA