假設您有來自兩個不同pubsub主題的Google雲數據流信號,並且您想要將來自一個主題的信號與來自另一個主題的信號進行比較,並生成匹配項,重新平等。如果同時(或幾乎同時)在一個主題中有多個相同的信號進入,那麼我們不應該生成匹配。是否有可能以近乎實時的方式在Dataflow中生成匹配,以便讓我們100%確定生成的匹配是否正確(即沒有誤報)?如果是這樣,你將如何實現它?由於數據可能會延遲到Dataflow的時間窗口,所以我懷疑這很難。爲了簡化,如果我們從一個pubsub主題獲得「A」,並從另一個主題獲得「A」,我們應該生成一個匹配,但只有在沒有任何其他A進入時話題幾乎在同一時間(正負1秒)。比較Google雲數據流中唯一匹配的數據集
0
A
回答
0
如果我正確理解你的問題,我會按照如下方式分解它。我將使用我們的標準四個問題,因爲它在這種情況下很有幫助。
- 「你在計算什麼?」 - 您正在進行流式加入,根據您的描述,我認爲您會希望選擇基於
CoGroupByKey
的加入。 - 「Where in event time do you want to group your data?」 - 如果事件發生在附近,您有興趣一起觀看活動。這大致對應於SDK中提供的
Sessions
。但是我不能告訴你是否希望會話通過兩個流,或者每個流。希望我的回答能讓你足夠工作。 - 「When do you want to produce output?」 - 根據您的100%確定性目標,我認爲您只有在決定放棄所有更多數據後纔會生產產量。要真正具有100%的確定性,您必須知道數據來自哪裏(這與數據流無關)。
- 「如何改進先前的輸出?」 - 由於單個連接結果沒有多個輸出,所以這不是問題。
讓我們假設你有這些輸入:
PCollection<String> streamA
PCollection<String> streamB
,如果你想基於整個流同時發生做到這一點匹配+內重複數據刪除,那麼你可以簡單窗戶進入Sessions
,並做了CoGroupByKey
:
PCollection<KV<String, String>> windowedA = streamA
.apply(WithKeys.of(String v -> v))
.apply(Window.into(
Sessions.withGapDuration(Duration.standardSeconds(1))));
PCollection<KV<String, String>> windowedB = // ditto
// Set up join handles
TupleTag<String> tagA = new TupleTag<String>() {};
TupleTag<String> tagb = new TupleTag<String>() {};
KeyedPCollectionTuple joinInput =
KeyedPCollectionTuple
.of(tagA, windowedA)
.and(tagB, windowedB);
PCollection<String> result = joinInput
// Group streams together by shared key
.apply(CoGroupByKey.create())
// Eliminate all but 1-to-1 matches
.apply(Filter.by(
KV<String, CoGbkResult> joined ->
Iterables.size(joined.getValue().getAll(tagA)) == 1
&& Iterables.size(joined.getValue().getAll(tagB)) == 1))
// The key is all we care about
.apply(Keys.create());
集合result
包含近乎同時匹配的字符串,但在一秒內沒有重複。你真正的用例可能需要一些調整。
如果您想單獨重複刪除兩個流,這稍微複雜一些,但不要太多。您需要分別爲每個流分配會話和GroupByKey
,然後根據想要如何顯示連接輸出來重新窗口化。
非常好的答案。謝謝! –