2017-06-01 79 views
0

我從Kafka主題接收二進制Avro文件,我必須反序列化它們。在Kafka收到的消息中,我可以在每條消息的開始處看到一個架構。我知道不嵌入模式並將其與實際的Avro文件分開是更好的做法,但我無法控制製作人,我無法更改。我如何從嵌入架構的Kafka反序列化Avro

我的代碼運行在Apache Storm上。首先,我創建一個讀者:

mDatumReader = new GenericDatumReader<GenericRecord>(); 

後來我嘗試反序列化消息沒有宣佈架構:

Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null); 
GenericRecord payload = mDatumReader.read(null, decoder); 

但是當一個消息到達我得到一個錯誤:

Caused by: java.lang.NullPointerException: writer cannot be null! 
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:77) ~[stormjar.jar:?] 
at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:46) ~[stormjar.jar:?] 
at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307) ~[stormjar.jar:?] 
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:122) ~[stormjar.jar:?] 
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:137) ~[stormjar.jar:?] 

我見過的所有答案都是關於使用其他格式,改變傳遞給Kafka或其他內容的消息。我無法控制這些事情。

我的問題是,給定bytes[]與二進制消息內嵌入模式的消息,如何反序列化該Avro文件,而無需聲明模式,以便我可以讀取它。

回答

0

對於DatumReader/Writer,沒有像嵌入式模式那樣的東西。第一次看Avro & Kafka時,我一直是我的誤解。但是Avro Serializer的源代碼清楚地顯示了在使用GenericDatumWriter時沒有嵌入架構。

這是數據文件寫入者在文件的開頭寫入架構,然後使用GenericDatumWriter添加GenericRecords。

既然你說在開始時有一個模式,我假設你可以讀取它,把它變成一個Schema對象,然後將它傳遞給GenericDatumReader(模式)構造函數。 知道消息如何序列化會很有趣。也許DataFileWriter用於寫入字節[]而不是實際文件,那麼你可以使用DataFileReader來反序列化數據?