2015-09-08 48 views
7

雖然使用Apache弗林克用下面的代碼:

DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() { 

    @Override 
    public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception { 
     List<String> top5 = Ordering.natural().greatestOf(iterable, 5); 
     collector.collect(top5); 
    } 
}).flatten(); 

我得到這個例外

Caused by: java.lang.UnsupportedOperationException 
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) 
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) 
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) 
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211) 
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110) 
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41) 
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) 
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125) 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127) 
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) 
    at java.lang.Thread.run(Thread.java:745) 

我怎樣才能UE的UnmodifiableCollection與弗林克?

回答

9

問題是Kryo的默認CollectionSerializer無法再次反序列化該集合,因爲它不可修改(.add()調用失敗)。

要解決該問題,我們可以使用kryo-serializers項目中的UnmodifiableCollectionsSerializer。 Flink傳遞依賴於項目,所以不需要將其作爲依賴項添加。

接下來,我們必須使用Flink的Kryo實例註冊序列化程序。

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); 
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); 
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class); 

通常情況下,我們不必調用Class.forName()用於註冊串行的,但在這種情況下,java.util.Collections$UnmodifiableCollection是包可見的,所以我們不能直接訪問類。

+4

感謝您提供的深刻見解和及時的答案。你的回答速度驚人:-) –