1
我下面this answer 創建使用阿卡流的SQS消費者:阿卡流:最後一個元素缺少來源
def queryForMessages = {
val messages = Sqs.receive(queueUrl, 3, 10)
println(s"Received from sqs: ${messages.map(_.getBody)}")
messages
}
def messageListStream : immutable.Stream[Iterable[SqsMessage]] = {
queryForMessages #:: messageListStream
}
def messageIterator() : Iterator[SqsMessage] = messageListStream.flatten.toIterator
Source.fromIterator(messageIterator)
.map(_.getBody)
.runForeach(m => println(s"Stream output: $m"))(materializer)
這一切似乎從工作其實,除了這一個從收到的最後一個元素該隊列不會被流拾取。 即如果我發佈四個項目到sqs,其中只有3個打印出來的流(項目「2」丟失)。我得到的輸出是:
Received from sqs: List(1)
Received from sqs: List(3, 4, 2)
Stream output: 1
Stream output: 3
Stream output: 4
Received from sqs: List()
Received from sqs: List()
缺少的元素(2)不實際出現但如果我發佈一些更多的元素:
Received from sqs: List(5)
Stream output: 2
Received from sqs: List(6)
Stream output: 5
任何想法?
我給你在你的問題中引用的答案。我懷疑問題是由評估流尾的懶惰本質引起的。嘗試這個:'def messageIterator()= Iterator.continually(messageListStream).flatMap(identity)' –
'Iterator.continually(queryForMessages).flatMap(identity) –