2017-05-31 61 views
1

我下面this answer 創建使用阿卡流的SQS消費者:阿卡流:最後一個元素缺少來源

def queryForMessages = { 
    val messages = Sqs.receive(queueUrl, 3, 10) 
    println(s"Received from sqs: ${messages.map(_.getBody)}") 
    messages 
    } 

    def messageListStream : immutable.Stream[Iterable[SqsMessage]] = { 
    queryForMessages #:: messageListStream 
    } 

    def messageIterator() : Iterator[SqsMessage] = messageListStream.flatten.toIterator 

    Source.fromIterator(messageIterator) 
    .map(_.getBody) 
    .runForeach(m => println(s"Stream output: $m"))(materializer) 

這一切似乎從工作其實,除了這一個從收到的最後一個元素該隊列不會被流拾取。 即如果我發佈四個項目到sqs,其中只有3個打印出來的流(項目「2」丟失)。我得到的輸出是:

Received from sqs: List(1) 
Received from sqs: List(3, 4, 2) 
Stream output: 1 
Stream output: 3 
Stream output: 4 
Received from sqs: List() 
Received from sqs: List() 

缺少的元素(2)不實際出現但如果我發佈一些更多的元素:

Received from sqs: List(5) 
Stream output: 2 
Received from sqs: List(6) 
Stream output: 5 

任何想法?

+0

我給你在你的問題中引用的答案。我懷疑問題是由評估流尾的懶惰本質引起的。嘗試這個:'def messageIterator()= Iterator.continually(messageListStream).flatMap(identity)' –

+0

'Iterator.continually(queryForMessages).flatMap(identity) –

回答

0

正如我在評論部分寫的那樣,我認爲問題的根源在於Streams以及懶惰的尾部評估被用作媒介。

的流成分是不必要的,因爲一個單獨Iterator可以解決的問題:

val messageListIterator :() => Iterator[Iterable[SqsMessage]] = 
() => Iterator continually queryForMessages 

val messageIterator :() => Iterator[SqsMessage] = 
() => messageListIterator() flatMap identity 

這現在可以在你的阿卡流Source使用:

Source 
    .fromIterator(messageIterator) 
    .map(_.getBody) 
    .runForeach(m => println(s"Stream output: $m"))(materializer) 

我已經相應地更新the linked question