我正在實現一個HTTP資源的迭代器,我可以恢復分頁元素列表,我試圖用一個普通的Iterator
做到這一點,但它是一個阻塞實現,並且因爲我是使用akka
它使我的調度員有點瘋狂。Akka流重試重複結果
我會用akka-stream
來實現相同的迭代器。問題是我需要不同的重試策略。
該服務返回一個由id
標識的元素列表,有時當我查詢下一頁時,該服務返回當前頁面上的相同產品。
我現在的算法是這樣的。
var seenIds = Set.empty
var position = 0
def isProblematicPage(elements: Seq[Element]) Boolean = {
val currentIds = elements.map(_.id)
val intersection = seenIds & currentIds
val hasOnlyNewIds = intersection.isEmpty
if (hasOnlyNewIds) {
seenIds = seenIds | currentIds
}
!hasOnlyNewIds
}
def incrementPage(): Unit = {
position += 10
}
def doBackOff(attempt: Int): Unit = {
// Backoff logic
}
@tailrec
def fetchPage(attempt: Int = 0): Iterator[Element] = {
if (attempt > MaxRetries) {
incrementPage()
return Iterator.empty
}
val eventualPage = service.retrievePage(position, position + 10)
val page = Await.result(eventualPage, 5 minutes)
if (isProblematicPage(page)) {
doBackOff(attempt)
fetchPage(attempt + 1)
} else {
incrementPage()
page.iterator
}
}
我正在使用akka-streams
實施,但我無法弄清楚如何累積使用流結構重複的頁面和測試。
有什麼建議嗎?
應該在'fetchPage'是'return'線'返回Iterator.empty'? –