2016-09-16 162 views
0

我正在調查Apache Ignite流如何工作。我有兩個節點集羣設置(都在本地主機上),並啓動一個客戶端節點,它使用StreamTransformer和EntryProcessor運行流式處理代碼。因此,我得到的一個節點無法反序列化異常。我的代碼被簡化字計數從例如點燃文檔:Apache Ignite:與數據流相關的序列化錯誤

public class StreamingExample {` 
public static class StreamingExampleCacheEntryProcessor implements CacheEntryProcessor<String, Long, Object> { 
    @Override 
    public Object process(MutableEntry<String, Long> e, Object... arg) throws EntryProcessorException { 
     Long val = e.getValue(); 
     e.setValue(val == null ? 1L : val + 1); 
     return null; 
    } 
} 

public static void main(String[] args) throws IgniteException, IOException { 
    Ignition.setClientMode(true); 
    try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { 
     IgniteCache<String, Long> stmCache = ignite.getOrCreateCache("mycache"); 
     try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) { 
      stmr.allowOverwrite(true); 
      stmr.receiver(StreamTransformer.from(new StreamingExampleCacheEntryProcessor())); 
      stmr.addData("word", 1L); 
      System.out.println("Finished"); 
     } 
    } 
} 

}

異常我得到一個兩個節點中的一個是

[23時38分23秒]拓撲快照[版本= 5 ,服務器= 2,客戶端= 1,CPU = 4,堆= 3.3GB] 線程「pub-#9%null%」類中的異常org.apache.ignite.binary.BinaryObjectException:未能使用優化編組器解組對象 在org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1595) 在org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1663) 在org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:298) 在org.apache。 ignit.internal.binary.BinaryMarshaller.unmarshal(BinaryMarshaller.java:109) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:278) at org.apache.ignite.internal。 processors.datastreamer.DataStreamProcessor.access $ 000(DataStreamProcessor.java:50) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor $ 1.onMessage(DataStreamProcessor.java:80) at org.apache.ignite.internal。 managers.communication.GridIoManager.invokeListener(GridIoManager.java:1238) at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:866) at org.apache.ignite.internal.managers.communication.GridIoManager.access $ 1700(GridIoManager.java:106) 在org.apache.ignite.internal.managers.communication.GridIoManager $ 5.run(GridIoManager.java:829) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent。 ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 引起:class org.apache.ignite.IgniteCheckedException:無法找到具有給定類加載器的類反編組(確保所有類的相同版本都標註在所有節點上或啓用同級加載):sun.mi [email protected] 在org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:224) 在org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1592 ) ...... 13多個 造成的:拋出java.lang.ClassNotFoundException:在 在java.net.URLClassLoader的$ 1.run(URLClassLoader.java:366)gridgaingames.StreamingExample $ StreamingExampleCacheEntryProcessor java.net.URLClassLoader的$ 1.run( URLClassLoader.java:355) 在java.security.AccessController.doPrivileged(本機方法) 在java.net.URLClassLoader.findClass(URLClassLoader.java:354) 在java.lang.ClassLoader.loadClass(ClassLoader.java:425 ) at sun.misc.Lau在java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName (Class.java:274) at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8350) at org.apache.ignite.internal.MarshallerContextAdapter.getClass(MarshallerContextAdapter。的java:185) 在org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:266) 在org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:318) 在java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream。 readSerializable(OptimizedObjectInputStream.java:579) at org.apache.ignite.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:841) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(Optimi zedObjectInputStream.java:324) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:218) ... 14 more

有幾件事情我不能。

1)我怎樣才能解決呢?

2)由於這不是不「廣播」什麼的,我應該點燃運行碼流只調用節點。看起來我錯了。那麼我的Streaming代碼在哪裏執行?

3)印刷後「完成」行我的代碼不會停止。爲什麼?看起來像一些非守護線程仍然存在。這是防止我的客戶端節點退出的Streaming代碼嗎?

PS

同行類加載啓用。如果我運行一些廣播的例子,它在許多節點上執行代碼 - 它工作正常。

回答

3

基本上IgniteDataStreamer在發送方(客戶端在您的示例中)準備數據批處理,並立即將它們發送到應存儲特定鍵值元組的目標節點。記住這個問題的答案如下:

  1. 在條目被放入緩存之前,目標節點(服務器節點)上執行變換器。這意味着服務器節點必須在其類路徑中包含變換器的類,或者,您必須啓用對等類加載。就個人而言,後者是更靈活和更可取的解決方案。
  2. 正如上面所解釋的,發件人只需準備發送到部署緩存的所有服務器的批次。服務器僅接收那些包含元數據的批處理,服務器是主數據庫或備用數據庫。
  3. 批次的沖洗發生在後臺,因爲IgniteDataStreamer用於快速數據預加載或複雜流處理(CEP)。有一些可以讓你調整沖洗參數 - autoFlustFrequencyperNodeBufferSize

最後,對於預加載的需求(當緩存是空的,你需要填寫起來)我會建議設置allowOverwritefalse這將使流光準備和主,備節點分別發送批次。如果此參數設置爲true,則僅在主節點上發送批次,而在更新其數據版本和相應備份後,主節點將使用基本cache.put操作注入數據。如果你只需要預加載你的緩存,這種方法會比較慢。

+0

謝謝,dmagda!但是我找不到關於ClassNotFoundException的主要問題的答案。啓用PeerToPeer類加載(正如您看到的,我正在使用Ignite附帶的配置之一 - examples/config/example-ignite.xml)。我仍然得到這個錯誤。我有一個猜測,可能是Transformer的代碼是在節點實際接收到Transformer的類之前啓動的。這是根源,還是別的?以及如何解決它? – MiamiBeach

+0

我設法重現了這個問題(https://issues.apache.org/jira/browse/IGNITE-3935)。感謝舉報!請參閱故障單中的建議解決方法。 – dmagda