2017-07-31 51 views
5

我正在生產項目,消耗多個協同例程並推回resultChannel。製片人在最後一項之後關閉頻道。扇出/扇入式結果通道

由於resultChannel永遠不會關閉,所以代碼永遠不會結束。如何檢測並正確完成迭代,以便hasNext()返回false

val inputData = (0..99).map { "Input$it" } 
val threads = 10 

val bundleProducer = produce<String>(CommonPool, threads) { 
    inputData.forEach { item -> 
     send(item) 
     println("Producing: $item") 
    } 

    println("Producing finished") 
    close() 
} 

val resultChannel = Channel<String>(threads) 

repeat(threads) { 
    launch(CommonPool) { 
     bundleProducer.consumeEach { 
      println("CONSUMING $it") 
      resultChannel.send("Result ($it)") 
     } 
    } 
} 

val iterator = object : Iterator<String> { 
    val iterator = resultChannel.iterator() 
    override fun hasNext() = runBlocking { iterator.hasNext() } 
    override fun next() = runBlocking { iterator.next() } 
}.asSequence() 

println("Starting interation...") 

val result = iterator.toList() 

println("finish: ${result.size}") 
+0

的我發現的方法就是在結果序列中取(100),但我不確定它離開底層結構的狀態。 – atok

回答

3

您可以運行等待消費者完成的協程,然後關閉resultChannel

首先,重寫啓動消費者節省了Job是代碼:

val jobs = (1..threads).map { 
    launch(CommonPool) { 
     bundleProducer.consumeEach { 
      println("CONSUMING $it") 
      resultChannel.send("Result ($it)") 
     } 
    } 
} 

然後運行另一個協程封閉,一旦所有的Job s的做渠道:

launch(CommonPool) { 
    jobs.forEach { it.join() } 
    resultChannel.close() 
}