2016-09-21 63 views
3

我正在實現一個HTTP資源的迭代器,我可以恢復分頁元素列表,我試圖用一個普通的Iterator做到這一點,但它是一個阻塞實現,並且因爲我是使用akka它使我的調度員有點瘋狂。Akka流重試重複結果

我會用akka-stream來實現相同的迭代器。問題是我需要不同的重試策略。

該服務返回一個由id標識的元素列表,有時當我查詢下一頁時,該服務返回當前頁面上的相同產品。

我現在的算法是這樣的。

var seenIds = Set.empty 
var position = 0 

def isProblematicPage(elements: Seq[Element]) Boolean = { 
    val currentIds = elements.map(_.id) 
    val intersection = seenIds & currentIds 
    val hasOnlyNewIds = intersection.isEmpty 
    if (hasOnlyNewIds) { 
    seenIds = seenIds | currentIds 
    } 
    !hasOnlyNewIds 
} 

def incrementPage(): Unit = { 
    position += 10 
} 

def doBackOff(attempt: Int): Unit = { 
    // Backoff logic 
} 

@tailrec 
def fetchPage(attempt: Int = 0): Iterator[Element] = { 
    if (attempt > MaxRetries) { 
    incrementPage() 
    return Iterator.empty 
    } 

    val eventualPage = service.retrievePage(position, position + 10) 

    val page = Await.result(eventualPage, 5 minutes) 

    if (isProblematicPage(page)) { 
    doBackOff(attempt) 
    fetchPage(attempt + 1) 
    } else { 
    incrementPage() 
    page.iterator 
    } 
} 

我正在使用akka-streams實施,但我無法弄清楚如何累積使用流結構重複的頁面和測試。

有什麼建議嗎?

+1

應該在'fetchPage'是'return'線'返回Iterator.empty'? –

回答

1

Flow.scan階段是一個很好的建議,但它缺乏處理期貨的功能,所以我實現了異步版本Flow.scanAsync它現在可以在akka 2.4.12上使用。

當前的實現是:

val service: WebService 
val maxTries: Int 
val backOff: FiniteDuration 

def retry[T](zero: T, attempt: Int = 0)(f: => Future[T]): Future[T] = { 
    f.recoverWith { 
    case ex if attempt >= maxAttempts => 
     Future(zero) 
    case ex => 
     akka.pattern.after(backOff, system.scheduler)(retry(zero, attempt + 1)(f)) 
    } 
} 

def isProblematicPage(lastPage: Seq[Element], currPage: Seq[Element]): Boolean = { 
    val lastPageIds = lastPage.map(_.id).toSet 
    val currPageIds = currPage.map(_.id).toSet 
    val intersection = lastPageIds & currPageIds 
    intersection.nonEmpty 
} 

def retrievePage(lastPage: Seq[Element], startIndex: Int): Future[Seq[Element]] = { 
    retry(Seq.empty) { 
    service.fetchPage(startIndex).map { currPage: Seq[Element] => 
     if (isProblematicPage(lastPage, currPage)) throw new ProblematicPageException(startIndex) 
     else currPage 
    } 
    } 
} 


val pagesRange: Range = Range(0, maxItems, pageSize) 

val scanAsyncFlow = Flow[Int].via(ScanAsync(Seq.empty)(retrievePage)) 

Source(pagesRange) 
    .via(scanAsyncFlow) 
    .mapConcat(identity) 
    .runWith(Sink.seq) 

拉蒙感謝您的意見:)

2

Flow.scan方法在這種情況下很有用。

type Position = Int 

//0,10,20,... 
def positionIterator() : Iterator[Position] = Iterator from (0,10) 

val positionSource : Source[Position,_] = Source fromIterator positionIterator 

這個位置源就可以被定向到一個Flow.scan它採用類似於您fetchPage(側面說明一個功能:你應該避免等待着儘可能多的

我將與位置的源啓動流盡可能地,有一種方法不需要在代碼中等待,但這超出了原始問題的範圍)。新的功能需要採取在已經看到元素的「狀態」:

def fetchPageWithState(service : Service) 
         (seenEls : Set[Element], position : Position) : Set[Elements] = { 

    val maxRetries = 10 

    val seenIds = seenEls map (_.id) 

    @tailrec 
    def readPosition(attempt : Int) : Seq[Elements] = { 
    if(attempt > maxRetries) 
     Iterator.empty 
    else { 
     val eventualPage : Seq[Element] = 
     Await.result(service.retrievePage(position, position + 10), 5 minutes) 

     if(eventualPage.map(_.id).exists(seenIds.contains)) { 
     doBackOff(attempt) 
     readPosition(attempt + 1) 
     } 
     else 
     eventualPage    
    } 
    }//end def readPosition 

    seenEls ++ readPosition(0).toSet 
}//end def fetchPageWithState 

這可現在Flow內使用:

def fetchFlow(service : Service) : Flow[Position, Set[Element],_] = 
    Flow[Position].scan(Set.empty[Element])(fetchPageWithState(service)) 

新的流程可以很容易地連接到你的位置源創造一個Set[Element]來源:

def elementsSource(service : Service) : Source[Set[Element], _] = 
    positionSource via fetchFlow(service) 

elementsSource每個新值將是一個不斷增長的設置從抓取網頁的獨特元素秒。

+0

感謝您的回覆,它非常有用! 我試圖從代碼中移除'Await',並使用你的實現,它不支持,因爲'seenElements'需要是'Future [Set [Element]]',所以它會鏈接掃描的解決。 我會嘗試'foldAsync'看看它是否有幫助:) –

+0

@mateusduboli歡迎您,愉快的黑客入侵。 –