2014-09-23 74 views
2

後,我看到這一點: Scalding: How to retain the other field, after a groupBy('field){.size}?燙,壓平田GROUPBY

這是一個真正的痛苦和比較Apache的豬亂七八糟......我該怎麼辦錯了嗎?我可以像一樣做GENERATE(FLATTEN())豬嗎?

我很困惑。這是我燙碼:

def takeTop(topAmount: Int) :Pipe = self 
    .groupBy(person1){ _.sortedReverseTake[Long](activityCount -> top, topAmount)} 
    .flattenTo[(Long, Long, Long)](top -> (person1, person2, activityCount)) 

而且我的測試:

"Take top 3" should "return most active pairs" in { 
    Given{ 
     List((1, 13, 7), 
      (1, 13, 8), 
      (1, 12, 9), 
      (1, 11, 10), 
      (2, 20, 21), 
      (2, 20, 22)) withSchema (person1, person2, activityCount) 
    } When { 
     pipe:RichPipe => pipe.takeTop(3) 
    } Then { 
     buffer: mutable.Buffer[(Long, Long, Long)] => 
     println(buffer.toList) 
     buffer.toList.size should equal(5) 
     println (buffer.toList) 

     buffer.toList should contain (1, 11, 10) 
     buffer.toList should contain (1, 12, 9) 
     buffer.toList should contain (1, 13, 8) 
     buffer.toList should not contain (1, 13, 7) 

     buffer.toList should contain (2, 20, 21) 
     buffer.toList should contain (2, 20, 22) 
    } 
    } 

而且我得到的運行時異常:

14/09/23 15:25:57 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing 
cascading.pipe.OperatorException: [com.twitter.scalding.T...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:478)] operator Each failed executing operation 
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107) 
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39) 
    at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47) 
    at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67) 
    at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145) 
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133) 
    at com.twitter.scalding.MRMAggregator.complete(Operations.scala:321) 
    at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:151) 
    at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39) 
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51) 
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28) 
    at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:113) 
    at cascading.flow.stream.Duct.complete(Duct.java:81) 
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296) 
    at cascading.flow.stream.Duct.complete(Duct.java:81) 
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296) 
    at cascading.flow.stream.SourceStage.map(SourceStage.java:105) 
    at cascading.flow.stream.SourceStage.call(SourceStage.java:53) 
    at cascading.flow.stream.SourceStage.call(SourceStage.java:38) 
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:138) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) 
    at java.lang.Thread.run(Thread.java:662) 
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple3 
    at com.twitter.scalding.GeneratedTupleSetters$$anon$25.apply(GeneratedConversions.scala:669) 
    at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:47) 
    at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:46) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:46) 
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99) 
    ... 23 more 

我該怎麼辦錯了嗎?

UPD:

我就是這麼做的:

def takeTop(topAmount: Int) :Pipe = self 
    .groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)} 
    .flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2)) 
    .project(person1, person2, activityCount) 

測試通過,但我不知道它的好辦法...

回答

0
def takeTop(topAmount: Int) :Pipe = self 
.groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)} 
.flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2)) 
.project(person1, person2, activityCount) 

作品,沒」 t找到更好的辦法