2017-02-21 74 views
0

我試圖實現一個使用Lagom 1.2.2設置的消息代理,並且碰到了牆壁。該文件對服務描述符下面的例子:Lagom消息代理的完整示例

default Descriptor descriptor() { 
return named("helloservice").withCalls(...) 
    // here we declare the topic(s) this service will publish to 
    .publishing(
    topic("greetings", this::greetingsTopic) 
) 
    ....; 
} 

而這個例子的實現:

public Topic<GreetingMessage> greetingsTopic() { 
return TopicProducer.singleStreamWithOffset(offset -> { 
    return persistentEntityRegistry 
     .eventStream(HelloEventTag.INSTANCE, offset) 
     .map(this::convertEvent); 
    }); 
} 

然而,有什麼樣的參數類型或返回convertEvent()函數的類型是沒有例子,這是我畫空白的地方。在另一端,在用戶向MessageBroker,似乎它的消費GreetingMessage對象,但是當我創建了一個功能convertEvent返回GreetingMessage對象,我得到一個編譯錯誤:

Error:(61, 21) java: method map in class akka.stream.javadsl.Source<Out,Mat> cannot be applied to given types; 
    required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T> 
    found: this::convertEvent 
    reason: cannot infer type-variable(s) T 
    (argument mismatch; invalid method reference 
    incompatible types: akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset> cannot be converted to com.example.GreetingMessage) 

是否有其他更徹底如何使用這個例子?我已經在Chirper示例應用程序中進行了檢查,但似乎沒有這方面的示例。

謝謝!

回答

3

您粘貼的錯誤消息告訴你到底是什麼map預計:

required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T> 

所以,你需要通過一個函數,它Pair<GreetingEvent, Offset>。函數應該返回什麼?那麼,更新它來獲取它,然後你會得到下一個錯誤,它會再次告訴你它期望你返回什麼,在這種情況下,你會發現它是Pair<GreetingMessage, Offset>

要解釋這些類型 - Lagom需要跟蹤的事件已發佈到卡夫卡,以便當您重新啓動服務,它不會從您的事件日誌的開頭開始並重新從所有事件開始的時間再次。它通過使用偏移來實現這一點。因此,事件日誌會生成事件和偏移量對,然後您需要將這些事件轉換爲將發佈到Kafka的消息,並且當您將轉換後的消息返回給Lagom時,它需要與偏移量成對您從事件日誌中獲得的信息,以便在向Kafka發佈後,Lagom可以保留偏移量,並在下次重新啓動服務時將其用作起始點。

完整的例子可以看這裏:https://github.com/lagom/online-auction-java/blob/a32e696/bidding-impl/src/main/java/com/example/auction/bidding/impl/BiddingServiceImpl.java#L91

+0

感謝;我一直在仔細研究你提供的拍賣示例應用程序,它是有幫助的(儘管它使用'taggedStreamWithOffset'而不是'singleStreamWithOffset'(說實話,可能是我想要的) 部分I失蹤的是'convertEvent'的參數;出於某些原因,我假設我搞亂了返回類型,即使當我的返回類型正確並導致我誤入歧途時也是如此。 再次感謝您的幫助! –