2016-12-02 69 views
4

來自node.js背景,我是Scala的新手,我嘗試使用Twitter的Future.collect來執行一些簡單的併發操作。但是我的代碼顯示了順序行爲而不是併發行爲。我究竟做錯了什麼?Twitter的Future.collect不能同時工作(Scala)

這裏是我的代碼,

import com.twitter.util.Future 

def waitForSeconds(seconds: Int, container:String): Future[String] = Future[String] { 
    Thread.sleep(seconds*1000) 
    println(container + ": done waiting for " + seconds + " seconds") 
    container + " :done waiting for " + seconds + " seconds" 
} 

def mainFunction:String = { 
    val allTasks = Future.collect(Seq(waitForSeconds(1, "All"), waitForSeconds(3, "All"), waitForSeconds(2, "All"))) 
    val singleTask = waitForSeconds(1, "Single") 

    allTasks onSuccess { res => 
    println("All tasks succeeded with result " + res) 
    } 

    singleTask onSuccess { res => 
    println("Single task succeeded with result " + res) 
    } 

    "Function Complete" 
} 

println(mainFunction) 

,這是輸出我得到的,

All: done waiting for 1 seconds 
All: done waiting for 3 seconds 
All: done waiting for 2 seconds 
Single: done waiting for 1 seconds 
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds) 
Single task succeeded with result Single :done waiting for 1 seconds 
Function Complete 

我想到的是輸出,

All: done waiting for 1 seconds 
Single: done waiting for 1 seconds 
All: done waiting for 2 seconds 
All: done waiting for 3 seconds 
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds) 
Single task succeeded with result Single :done waiting for 1 seconds 
Function Complete 
+0

那麼'Future'的整個點就是將計算卸載到另一個線程。因此,在mainFunction的第一行中,將產生三個新線程,這些線程(當生成的線程完成時)會產生另一個線程來執行oncomplete函數。 在第二行中,生成另一個線程,並將與其他線程並行執行。所以我認爲實際產出似乎有效... – irundaia

回答

8

Twitter的未來是更加明確,其中計算是比Scala標準庫期貨執行的。特別是,Future.apply將安全地捕獲異常(如s.c.Future),但它沒有說明計算將運行在哪個線程上。在您的情況下,計算在主線程中運行,這就是爲什麼您會看到結果你看到了。

這種方法與標準庫的未來API相比有幾個優點。一方面,它使方法簽名更簡單,因爲不存在必須在各處傳遞的隱含的ExecutionContext。更重要的是,它更容易避免上下文切換(Brian Degenhardt的here's a classic explanation)。在這方面,Twitter的Future更像斯卡拉斯的Task,並且具有基本相同的性能優點(例如在this blog post中描述)。

更明確地指出計算運行的地方在於,您必須更清楚地瞭解計算的運行位置。你的情況,你可以寫這樣的事情:

import com.twitter.util.{ Future, FuturePool } 

val pool = FuturePool.unboundedPool 

def waitForSeconds(seconds: Int, container:String): Future[String] = pool { 
    Thread.sleep(seconds*1000) 
    println(container + ": done waiting for " + seconds + " seconds") 
    container + " :done waiting for " + seconds + " seconds" 
} 

這不會產生正是你要求的輸出(「功能齊全」將首先打印,allTaskssingleTask不是對於測序彼此),但它會在單獨的線程上並行運行任務。

(作爲一個註腳:在上面我的例子FuturePool.unboundedPool是創建一個演示未來池的簡單方法,而且往往是蠻好的,但它是不適合的CPU密集型的計算,看看the FuturePool API docs其他創建未來池的方法將使用您提供的並可以自己管理的ExecutorService。)

+1

非常好! 我用你的代碼,並在最後一行添加了一個Thread.sleep(6000),現在我看到了併發行爲。進行更改後的輸出, '功能Complete' '所有:完成等待1 seconds' '單:做等待1 seconds' :完成等待1 seconds' '單任務與結果單成功'全部:等待2秒鐘' '全部:等待3秒鐘' '所有任務都成功結果ArraySeq(全部:完成等待1秒,全部:完成等待3秒,全部:完成等待2秒秒)' – Ram

+2

添加一點點。 Twitter最近發佈的文檔解釋了他們背後的一些設計決策:https://github.com/twitter/finagle/blob/develop/doc/src/sphinx/developers/Futures.rst –

+1

(com.twitter .util。)Await.result(allTask​​s)可能會更好,而不是Thread.sleep(6000) – n4to4