[注意]問題是Lagom框架的具體問題!Lagom PubSubRef訂閱者刪除消息
在我目前的項目中,當上流速度很高並且看起來下游無法及時處理所有消息時,觀察到將來自Source的消息列表剪切成Kafka主題發佈者的問題已被觀察到。由於意識到,切割有關PubSubRef.subscribe()方法https://github.com/lagom/lagom/blob/master/pubsub/javadsl/src/main/scala/com/lightbend/lagom/javadsl/pubsub/PubSubRef.scala#L85
行爲的完整的方法定義:
def subscriber(): Source[T, NotUsed] = {
scaladsl.Source.actorRef[T](bufferSize, OverflowStrategy.dropHead)
.mapMaterializedValue { ref =>
mediator ! Subscribe(topic.name, ref)
NotUsed
}.asJava
}
有OverflowStrategy.dropHead使用。是否可以更改爲使用back-pressure strategy?
UPD#1: 用例是非常簡單的,當一個查詢請求被髮布到命令主題,把它和查詢從數據庫表對象,結果列表被推入結果卡夫卡的話題。代碼段:
objectsResultTopic = pubSub.refFor(TopicId.of(CustomObject.class, OBJECTS_RESULT_TOPIC));
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
Flow.fromFunction(this::deserializeCommandAndQueryObjects)
.mapAsync(concurrency, objects -> objects)
.flatMapMerge(concurrency, objects -> objects)
.alsoTo(Sink.foreach(event -> LOG.trace("Sending object {}", object)))
.to(objectsResultTopic.publisher()),
Source.repeat(Done.getInstance())
)
)
在對象的情況下,流生成由deserializeCommandAndQueryObjects
函數大於默認緩衝器大小= 1000它啓動切割元件(我們的情況下是〜2.5K對象)。
UPD#2: 對象數據的來源是:
// returns CompletionStage<Source<CustomObject, ?>>
jdbcSession.withConnection(
connection -> Source.from(runQuery(connection, rowConverter))
)
而且還有一個訂閱卡夫卡objectsResultTopic
:
TopicProducer.singleStreamWithOffset(
offset -> objectsResultTopic.subscriber().map(gm -> {
JsonNode node = mapper.convertValue(gm, JsonNode.class);
return Pair.create(node, offset);
}));
發佈到'objectQueryTopic'中的數據的來源是什麼?什麼是訂閱'objectsResultTopic'重申,你在這裏使用的'objectsResultTopic'的API並不使用Kafka。 –
我可能會被一堆代碼片段混淆,但主要想法是獲取PubSubRef'ObjectsResultTopic'並訂閱Kafka topic = OBJECTS_RESULT_TOPIC,並將通過流加載的數據從DB源加載到objectsResultTopic.publisher()中以發佈它們在結果主題。 – VRomaN
對於StackOverflow問答,這可能會有點複雜,是的:)我認爲底線是'PubSubRef'不是這項工作的最佳工具。這聽起來像是你想要從卡夫卡主題中讀取數據,轉換數據,然後將結果寫入另一個主題。那是對的嗎? –