我想在Scala中執行一列值的聚合。這裏有一些考慮:使用關聯運算符在Scala中的並行聚合
- 聚集函數[1]是締合以及可交換:實例是加和乘
- 這個函數應用於並行列表以利用CPU 的所有核
這是一個實現:
package com.example.reactive
import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object AggregateParallel {
private def pm[T](l: List[Future[T]])(zero: T)(fn: (T, T) => T): Future[T] = {
val l1 = l.grouped(2)
val l2 = l1.map { sl =>
sl match {
case x :: Nil => x
case x :: y :: Nil =>
for (a <- x; b <- y) yield fn(a, b)
case _ => Future(zero)
}
}.toList
l2 match {
case x :: Nil => x
case x :: xs => pm(l2)(zero)(fn)
case Nil => Future(zero)
}
}
def parallelAggregate[T](l: List[T])(zero: T)(fn: (T, T) => T): T = {
val n = pm(l.map(Future(_)))(zero)(fn)
Await.result(n, 1000 millis)
n.value.get.get
}
def main(args: Array[String]) {
// multiply empty list: zero value is 1
println(parallelAggregate(List[Int]())(1)((x, y) => x * y))
// multiply a list: zero value is 1
println(parallelAggregate(List(1, 2, 3, 4, 5))(1)((x, y) => x * y))
// sum a list: zero value is 0
println(parallelAggregate(List(1, 2, 3, 4, 5))(0)((x, y) => x + y))
// sum a list: zero value is 0
val bigList1 = List(1, 2, 3, 4, 5).map(BigInt(_))
println(parallelAggregate(bigList1)(0)((x, y) => x + y))
// sum a list of BigInt: zero value is 0
val bigList2 = (1 to 100).map(BigInt(_)).toList
println(parallelAggregate(bigList2)(0)((x, y) => x + y))
// multiply a list of BigInt: zero value is 1
val bigList3 = (1 to 100).map(BigInt(_)).toList
println(parallelAggregate(bigList3)(1)((x, y) => x * y))
}
}
OUTPUT:
1
120
15
15
5050
93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000
我還可以在Scala中實現相同的目標還是改進此代碼?
EDIT1:
我已經實現自下而上的彙總。我認爲我非常接近Scala中的aggregate
方法(見下文)。所不同的是,我只拆分成子列表兩個元素:
Scala實現:
def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
}
有了這個實現我假定總髮生在平行像這樣:
List(1,2,3,4,5,6)
-> split parallel -> List(List(1,2), List(3,4), List(5,6))
-> execute in parallel -> List(3, 7, 11)
-> split parallel -> List(List(3,7), List(11))
-> execute in parallel -> List(10, 11)
-> Result is 21
這是正確的假設斯卡拉aggregate
也在做自下而上的並行聚合?
[1] http://www.mathsisfun.com/associative-commutative-distributive.html
scala的並行列表已經有了一個'聚合'方法,您可以根據自己的要求進行操作。 http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/ – gwenzek 2014-09-24 07:29:58
請在問題中檢查我的EDIT1。 – tuxdna 2014-09-24 07:56:49
我回答了您的編輯 – gwenzek 2014-09-24 10:46:14