我有2個數據流,我希望能夠加入他們1個月的窗口,讓我們說。當我有一個實時數據,一切都是樂趣和超級容易KStream和加入。我做了這樣的事情;卡夫卡流與JoinWindow工作進行數據回放
KStream<String, GenericRecord> stream1 =
builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic1());
KStream<String, GenericRecord> stream2 =
builder.stream(Serdes.String(), new CustomizeAvroSerde<>(this.getSchemaRegistryClient(), this.getKafkaPropsMap()), getKafkaConsumerTopic2());
long joinWindowSizeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days
KStream<String, GenericRecord> joinStream = stream1.join(stream2,
new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord genericRecord, GenericRecord genericRecord2) {
final GenericRecord jonnedRecord = new GenericData.Record(jonnedRecordSchema);
....
....
....
return jonnedRecord;
}
}, JoinWindows.of(joinWindowSizeMs));
當我想要進行數據重播時出現問題。假設我想重新執行這些連接以獲得過去6個月的數據,因爲我一次運行所有數據的管道,kafkaStream將連接所有可連接的數據,並且不考慮時間差異它應該只加入過去一個月的數據)。我假設JoinWindow時間是我們將數據插入到Kafka主題中的時間,對嗎?
以及我如何改變和操縱這個時間,所以我可以運行我的數據重播正確,我的意思是重新插入這些過去的6個月的數據應該需要一個月的時間之窗,每一個相應的記錄,並加入基於一個。
這個問題是不能重複的How to manage Kafka KStream to Kstream windowed join?,還有我問我怎麼能可以根據時間窗口上加入。這裏我正在談論數據重放。根據我的理解,在加入Kafka時需要將數據作爲JoinWindow的時間插入到主題中,因此,如果您想要執行數據重放並在6個月前重新插入數據,則kafka會將其作爲新數據今天插入,並將與其他一些實際上適用於今天的其他數據一起加入。
[如何管理Kafka KStream到Kstream窗口連接?](http://stackoverflow.com/questions/41707854/how-to-manage-kafka-kstream-to-kstream-windowed-join) –
@ MatthiasJ.Sax不是,這個問題完全不同,我已經編輯了一些問題。 – Am1rr3zA
恢復了我的近距離投票。 –