2
後,我看到這一點: Scalding: How to retain the other field, after a groupBy('field){.size}?燙,壓平田GROUPBY
這是一個真正的痛苦和比較Apache的豬亂七八糟......我該怎麼辦錯了嗎?我可以像一樣做GENERATE(FLATTEN())豬嗎?
我很困惑。這是我燙碼:
def takeTop(topAmount: Int) :Pipe = self
.groupBy(person1){ _.sortedReverseTake[Long](activityCount -> top, topAmount)}
.flattenTo[(Long, Long, Long)](top -> (person1, person2, activityCount))
而且我的測試:
"Take top 3" should "return most active pairs" in {
Given{
List((1, 13, 7),
(1, 13, 8),
(1, 12, 9),
(1, 11, 10),
(2, 20, 21),
(2, 20, 22)) withSchema (person1, person2, activityCount)
} When {
pipe:RichPipe => pipe.takeTop(3)
} Then {
buffer: mutable.Buffer[(Long, Long, Long)] =>
println(buffer.toList)
buffer.toList.size should equal(5)
println (buffer.toList)
buffer.toList should contain (1, 11, 10)
buffer.toList should contain (1, 12, 9)
buffer.toList should contain (1, 13, 8)
buffer.toList should not contain (1, 13, 7)
buffer.toList should contain (2, 20, 21)
buffer.toList should contain (2, 20, 22)
}
}
而且我得到的運行時異常:
14/09/23 15:25:57 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [com.twitter.scalding.T...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:478)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47)
at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
at com.twitter.scalding.MRMAggregator.complete(Operations.scala:321)
at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:151)
at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:113)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple3
at com.twitter.scalding.GeneratedTupleSetters$$anon$25.apply(GeneratedConversions.scala:669)
at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:47)
at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:46)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:46)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
... 23 more
我該怎麼辦錯了嗎?
UPD:
我就是這麼做的:
def takeTop(topAmount: Int) :Pipe = self
.groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)}
.flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2))
.project(person1, person2, activityCount)
測試通過,但我不知道它的好辦法...