1

我正在使用處理器來使用來自主題的字節數組serdes的字節數組數據,將它們處理爲通用記錄(基於模式I從我的HTTP GET請求獲得)並將它們發送到具有格式化avro模式註冊表的主題。空指針異常/未找到當我嘗試在Avro模式中處理和吸收數據時出現異常

我沒有問題從HTTP GET請求中檢索架構,並根據它來映射我的數據以生成架構之後的通用記錄。然而,當我試圖把它沉到的話題,我得到一個空指針異常:

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message 
    Caused by: java.lang.NullPointerException 
atio.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer. 
    java:72  ) 
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54) 
at 
    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78) 
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79) 
atorg.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl 
    .java:83) 
at streamProcessor.XXXXprocessor.process(XXXXprocessor.java:80) 
at streamProcessor.XXXXprocessor.process(XXXXprocessor.java:1) 
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
atorg.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetr 
    icsImpl.java:188) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
atorg.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl 
    .java:111) 
at streamProcessor.SelectorProcessor.process(SelectorProcessor.java:33) 
at streamProcessor.SelectorProcessor.process(SelectorProcessor.java:1) 
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
atorg.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetr 
    icsImpl.java:188) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
atorg.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl 
    .java:83) 
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) 
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) 

這是我的拓撲代碼:

//Stream Properties 
Properties config = new Properties(); 
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor-kafka-streams234"); 
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxxxxxxxxxxxx:xxxx"); 
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
     Serdes.ByteArray().getClass().getName()); 

config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
     WallclockTimestampExtractor.class); 



//Build topology 
TopologyBuilder builder = new TopologyBuilder(); 
builder.addSource("messages-source", "mytest2"); 
builder.addProcessor("selector-processor",() -> new SelectorProcessor(), "messages-source"); 

builder.addProcessor("XXXX-processor",() -> new XXXXprocessor(), "selector-processor"); 
builder.addSink("XXXX-sink", "XXXXavrotest", new KafkaAvroSerializer(), new    
     KafkaAvroSerializer(), "XXXX-processor"); 



//Start Streaming 
KafkaStreams streaming = new KafkaStreams(builder, config); 
streaming.start(); 
System.out.println("processor streaming..."); 

的問題論壇上,我發現,我可能需要一些讀數之後注射時,我創建了KafkaAvroSerializer s的客戶,所以我改變該行:

SchemaRegistryClient client = new 
    CachedSchemaRegistryClient("xxxxxxxxxxxxxxxxxxxxxx:xxxx/subjects/xxxxschemas/versions", 1000); 
    builder.addSink("XXXX-sink", "XXXXavrotest", new KafkaAvroSerializer(client), new 
    KafkaAvroSerializer(client), "XXXX-processor"); 

這就造成了HTTP 404未發現異常...

+1

我猜你不需要客戶端,但要將模式註冊表url添加到你的StreamsConfig:'config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,「your-URL」);' –

+0

Compare:https:// github。 COM/confluentinc /示例/斑點/ 3.2.x中/卡夫卡流/ SRC /測試/ JAVA/IO /匯合/示例/流/ GenericAvroIntegrationTest.java –

回答

0

我有我的網址錯誤:P

此外,由於我的主題cleanup.policy設置,關鍵必須是初始化除了null之外的東西。