2017-07-03 98 views
0

[注意]問題是Lagom框架的具體問題!Lagom PubSubRef訂閱者刪除消息

在我目前的項目中,當上流速度很高並且看起來下游無法及時處理所有消息時,觀察到將來自Source的消息列表剪切成Kafka主題發佈者的問題已被觀察到。由於意識到,切割有關PubSubRef.subscribe()方法https://github.com/lagom/lagom/blob/master/pubsub/javadsl/src/main/scala/com/lightbend/lagom/javadsl/pubsub/PubSubRef.scala#L85

行爲的完整的方法定義:

def subscriber(): Source[T, NotUsed] = { 
scaladsl.Source.actorRef[T](bufferSize, OverflowStrategy.dropHead) 
    .mapMaterializedValue { ref => 
    mediator ! Subscribe(topic.name, ref) 
    NotUsed 
    }.asJava 
} 

OverflowStrategy.dropHead使用。是否可以更改爲使用back-pressure strategy

UPD#1: 用例是非常簡單的,當一個查詢請求被髮布到命令主題,把它和查詢從數據庫表對象,結果列表被推入結果卡夫卡的話題。代碼段:

objectsResultTopic = pubSub.refFor(TopicId.of(CustomObject.class, OBJECTS_RESULT_TOPIC)); 
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
    Flow.fromFunction(this::deserializeCommandAndQueryObjects) 
     .mapAsync(concurrency, objects -> objects) 
     .flatMapMerge(concurrency, objects -> objects) 
     .alsoTo(Sink.foreach(event -> LOG.trace("Sending object {}", object))) 
     .to(objectsResultTopic.publisher()), 
    Source.repeat(Done.getInstance()) 
    ) 
) 

在對象的情況下,流生成由deserializeCommandAndQueryObjects函數大於默認緩衝器大小= 1000它啓動切割元件(我們的情況下是〜2.5K對象)。

UPD#2: 對象數據的來源是:

// returns CompletionStage<Source<CustomObject, ?>> 
jdbcSession.withConnection(
    connection -> Source.from(runQuery(connection, rowConverter)) 
) 

而且還有一個訂閱卡夫卡objectsResultTopic

TopicProducer.singleStreamWithOffset(
offset -> objectsResultTopic.subscriber().map(gm -> { 
    JsonNode node = mapper.convertValue(gm, JsonNode.class); 
    return Pair.create(node, offset); 
})); 
+0

發佈到'objectQueryTopic'中的數據的來源是什麼?什麼是訂閱'objectsResultTopic'重申,你在這裏使用的'objectsResultTopic'的API並不使用Kafka。 –

+0

我可能會被一堆代碼片段混淆,但主要想法是獲取PubSubRef'ObjectsResultTopic'並訂閱Kafka topic = OBJECTS_RESULT_TOPIC,並將通過流加載的數據從DB源加載到objectsResultTopic.publisher()中以發佈它們在結果主題。 – VRomaN

+0

對於StackOverflow問答,這可能會有點複雜,是的:)我認爲底線是'PubSubRef'不是這項工作的最佳工具。這聽起來像是你想要從卡夫卡主題中讀取數據,轉換數據,然後將結果寫入另一個主題。那是對的嗎? –

回答

1

如果有人有興趣,最後我們解決了這個問題,通過使用阿卡製片API,如:

ProducerSettings<String, CustomObject> producerSettings = ProducerSettings.create(system, new StringSerializer(), new CustomObjectSerializer()); 
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
    Flow.fromFunction(this::deserializeCommandAndQueryObjects) 
     .mapAsync(concurrency, objects -> objects) 
     .flatMapMerge(concurrency, objects -> objects) 
     .alsoTo(Sink.foreach(object -> LOG.trace("Sending event {}", object))) 
     .map(object -> new ProducerRecord<String, CustomObject>(OBJECTS_RESULT_TOPIC, object)) 
     .to(Producer.plainSink(producerSettings)), 
    Source.repeat(Done.getInstance()))); 

它的工作原理沒有緩衝,只是使推到卡夫卡的話題。

+0

太好了!在閱讀之前我發佈了我的最後一條評論,表明您解決了這個問題這看起來像是一個很好的解決方案。 –

3

這聽起來像Lagom的distributed publish-subscribe功能可能不你有工作的最佳工具。

你的問題提到卡夫卡,但這個功能並沒有利用卡夫卡。相反,它通過直接向羣集中的所有用戶廣播消息來工作。這是一次「最多一次」的消息傳遞,它可能確實會丟失消息,並且針對那些關注最新消息而不是處理每一個消息的消費者。溢出策略不是可定製的,您不希望在這些用例中使用背壓,因爲這意味着一個緩慢的用戶可能會減慢向所有其他用戶的交付。

有,你有一些其他選項:

  1. 如果你想使用卡夫卡,你應該使用Lagom的message broker API。這支持「至少一次」的傳遞語義,並且可以用來確保每個消費者處理每個消息(代價是可能增加延遲)。在這種情況下,卡夫卡扮演着一個巨大的持久緩衝區,所以它甚至比背壓更好:生產者和消費者可以以不同的速度前進,並且(當與partitioning一起使用時),您可以添加消費者以便擴展在需要時更快速地處理消息。

    當生產者和消費者都在同一個服務中時,可以使用消息代理API,但它特別適用於服務之間的通信。

  2. 如果您發送的消息是持久性實體事件,並且消費者是同一服務的一部分,那麼persistent read-side processor可能是一個不錯的選擇。

    這也提供了「至少一次」發貨,而如果處理消息的唯一影響是數據庫更新,那麼內置的Cassandra read-side databasesrelational read-side databases支持提供「有效的一次」語義,在數據庫更新事務運行以確保在事件處理期間發生的故障不會導致部分更新。

  3. 如果您發送的消息是持久性實體事件,消費者是同一服務的一部分,但您希望將事件作爲流處理,則可以訪問raw stream of events

  4. 如果您的用例不符合Lagom明確支持的用例之一,則可以使用更低級別的Akka API(包括distributed publish-subscribe)來實現更適合您需求的某些內容。

最好的選擇將取決於你的用例的具體情況:消息的來源和你想要的消費者類型。如果您通過更多詳細信息更新您的問題併爲此答案添加評論,我可以使用更具體的建議來編輯答案。

+0

你好!感謝您的回覆,我更新了問題並添加了受此問題影響的代碼段中的實際用例。 – VRomaN