0
我的akka流在一個元素之後停止。這是我的流:Akka流在一個元素後停止
val firehoseSource = Source.actorPublisher[FirehoseActor.RawTweet](
FirehoseActor.props(
auth = ...
)
)
val ref = Flow[FirehoseActor.RawTweet]
.map(r => ResponseParser.parseTweet(r.payload))
.map { t => println("Received: " + t); t }
.to(Sink.onComplete({
case Success(_) => logger.info("Stream completed")
case Failure(x) => logger.error(s"Stream failed: ${x.getMessage}")
}))
.runWith(firehoseSource)
FirehoseActor
連接到Twitter firehose並將消息緩衝到隊列。當演員接收Request
消息時,它take
S中的下一個元素,並返回它:
def receive = {
case Request(_) =>
logger.info("Received request for next firehose element")
onNext(RawTweet(queue.take()))
}
的問題是,只有一個單一的鳴叫正在打印到控制檯。該程序不會退出或拋出任何錯誤,我已經在周圍散佈了日誌記錄,並且沒有打印任何錯誤。
我認爲接收器會繼續施加壓力來拉動元件,但似乎並不是這樣,因爲Sink.onComplete
中的消息都沒有打印出來。我也嘗試使用Sink.ignore
,但也只打印單個元素。演員中的日誌消息也只會打印一次。
我需要使用什麼接收器才能使其無限期地通過流?