0
我使用的數據集的API,我有兩種情況下類NPE當弗林克組情況下的類對象
case class Geo(country:Int, province:Int, city:Int, county:Int)
case class AntiFraudLog(
eventType: Int,
valid: Boolean
)
case class AntiFraudSession(fraudLogs: Seq[AntiFraudLog])
然後我生成的鍵/值對其值是的情況下,類。
val dataKeyValue: DataSet[(Long, AntiFraudLog)]
並嘗試組項目進行相同的密鑰
val groupedSortedData = dataKeyValue groupBy 0
然後分組數據轉換到另一個案例類
val sessionData:DataSet[AntiFraudSession] = groupedSortedData reduceGroup(
logs => AntiFraudSession(logs.map(_._2).toSeq)
)
但是當我運行該程序,我遇到了這樣的異常
Caused by: java.lang.NullPointerException
at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90)
at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:32)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.api.scala.DataSet$$anon$5$$anonfun$flatMap$1.apply(DataSet.scala:417)
at org.apache.flink.api.scala.DataSet$$anon$5$$anonfun$flatMap$1.apply(DataSet.scala:417)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.flink.api.scala.DataSet$$anon$5.flatMap(DataSet.scala:417)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:163)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
有誰知道如何解決它?