2010-09-26 70 views
13

基本上我想這個轉換:發生器/塊到迭代器/流轉換

def data(block: T => Unit) 

到Stream(dataToStream是爲此轉換的假想功能):

val dataStream: Stream[T] = dataToStream(data) 

我想這問題可以通過延續解決:

// let's assume that we don't know how data is implemented 
// we just know that it generates integers 
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) } 

// here we can print all data integers 
data { i => println(i) } 

// >> but what we really want is to convert data to the stream << 

// very dumb solution is to collect all data into a list 
var dataList = List[Int]() 
data { i => dataList = i::dataList } 
// and make a stream from it 
dataList.toStream 

// but we want to make a lazy, CPU and memory efficient stream or iterator from data 
val dataStream: Stream[Int] = dataToStream(data) 
dataStream.foreach { i => println(i) } 

// and here a black magic of continuations must be used 
// for me this magic is too hard to understand 
// Does anybody know how dataToStream function could look like? 

謝謝,Dawid

+0

恩惠無線答案,或有說服力的論點,沒有任何。 – 2010-09-28 18:27:08

+1

你的「塊」不會產生任何值。這怎麼能變成流?單位是單身。 – 2010-09-28 18:38:23

+0

所需的流是發送到「塊」的一系列參數,而不是這些調用的結果。 – 2010-09-28 19:02:59

回答

2

我還是要弄清楚自己該怎麼做。我懷疑,答案就在這裏的某個地方:

編輯:刪除代碼,演示瞭如何解決的不同的問題。

EDIT2:使用最初發布http://gist.github.com/574873代碼http://gist.github.com/580157,你可以這樣做:

object Main { 
    import Generator._ 

    def data = generator[Int] { yld => 
    for (i <- suspendable(List.range(0, 11))) yld(i) 
    } 

    def main(args: Array[String]) { 
    for(i <- data.toStream) println(i) 
    } 
} 

data不採取分組碼,但我認爲這是很好的,因爲與延續,塊可以由主叫方處理。發生器的代碼可以在github的要點中看到。

+0

埃爾姆,你沒有解決一個完全不同於OP的問題嗎? OP的'data'函數被稱爲''block'函數十次,他想把它變成十個元素的流。你的'data'函數只調用一次'block'。 – sepp2k 2010-09-26 16:58:59

+0

@ sepp2k,是的,是的。那麼我猜延續是必要的。 – huynhjl 2010-09-26 17:27:18

+0

我試圖從這個線程使用代碼http://stackoverflow.com/questions/2201882/implementing-yield-yield-return-using-scala-continuations/3758084但沒有成功 – 2010-09-26 20:14:46

3

下面是一個簡單的解決方案,它產生一個消耗數據的線程。它將數據發佈到SynchronousQueue。從隊列中提取數據被創建並返回一個流:

def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = { 
    val queue = new java.util.concurrent.SynchronousQueue[Option[T]] 
    val callbackthread = new Runnable { 
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) } 
    } 
    new Thread(callbackthread).start() 
    Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get) 
} 
+0

由於CPS的限制,這可能是Scala達到v2.8的唯一解決方案。不幸的是,它比使用純發電機慢170倍。請參閱https://gist.github.com/a79c0a9669eea3d47eee – 2010-09-27 19:17:55

2

這裏的一個基於延續分隔實施方式中,適於從@Geoff蘆葦叢的供:

import Stream._ 
import scala.util.continuations._ 
import java.util.concurrent.SynchronousQueue 

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset { 
    val queue = new SynchronousQueue[Option[A]] 
    queue.put(Some(shift { k: (A=>Unit) => 
     new Thread() { 
      override def run() { 
       data(k) 
       // when (if) the data source stops pumping, add None 
       // to signal that the stream is dead 
       queue.put(None) 
      } 
     }.start() 
     continually(queue.take).takeWhile(_.isDefined).map(_.get) 
    }) 
} 
9

EDITED:修改的實施例中,以顯示遍歷的懶惰.view

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {  
    | println("Generating " + i) 
    | f(i) 
    | } 
data: (f: (Int) => Unit)Unit 

scala> def toTraversable[T](func : (T => Unit) => Unit) = new Traversable[T] { 
    | def foreach[X](f : T => X) = func(f(_) : Unit)      
    | }                  
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T] 

toTraversable方法會將您的數據函數轉換爲Traversable集合。本身並沒有什麼大的,但是你可以把它轉換成一個懶惰的TraversableView。這裏有一個例子:

scala> toTraversable(data).view.take(3).sum 
Generating 1 
Generating 2 
Generating 3 
Generating 4 
res1: Int = 6 

取方法的本質不幸的是,它必須去一個過去產生的正常工作的最後一個值,但它會提前終止。上述代碼在沒有「.view」調用的情況下看起來相同。但是,這裏有一個更引人注目的例子:

scala> toTraversable(data).view.take(2).foreach(println) 
Generating 1 
1 
Generating 2 
2 
Generating 3 

所以在最後,我相信你正在尋找的集合是TraversableView,這是最容易產生視做一個Traversable,然後調用「視圖」就可以了。如果你真的想要Stream類型,這裏有一個在2.8.0中可用的方法。最後,將讓「流」無緒:

scala> def dataToStream(data : (Int => Unit) => Unit) = { 
    | val x = new Traversable[Int] {      
    |  def foreach[U](f : Int => U) = {     
    |  data(f(_) : Unit)        
    |  } 
    | } 
    | x.view.toList.toStream        
    | } 
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int] 

scala> dataToStream(data) 
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?) 

這種方法的本質不幸的是,它會遍歷整個穿越使得流之前。這也意味着所有的值都需要緩存在內存中。唯一的選擇是訴諸線程。另外:這是爲什麼喜歡Traversables作爲scalax.io.File方法的直接返回的動機:「lines」「chars」和「bytes」。

+0

正如您所看到的,數據首先被評估,然後轉換爲Stream。所以這裏沒有懶惰。 – 2010-09-29 18:46:25

+0

我的觀點是,如果您使用TraversableView,您可以與數據交互作爲「流」。通過要求類型「流」你自己限制。 TraversableView *是*懶惰。 – jsuereth 2010-09-29 20:54:53

+0

如果遍歷視圖在REPL中看起來不懶惰,這是因爲REPL在結果表達式上調用「toString」,這將導致TraversableView遍歷整個集合(顯示所有值)。如果你使用TraversableView開發一個函數,你會看到它的懶惰。 – jsuereth 2010-09-29 20:56:26