如果您的主題是「String」並且其值爲「Test」,那麼您首先需要通過實施kafka.serializer.Encoder
和kafka.serializer.Decoder
來創建TestEncoder和TestDecoder類。現在,在您的createDirectStream方法,你可以有
JavaPairInputDStream<String, Test> testData = KafkaUtils
.createDirectStream(context, String.class,Test.class ,StringDecoder.class,TestDecoder.class,props,topics);
您可以在https://www.tomsdev.com/blog/2015/storm-kafka-complex-types/
參考KafkaKryoEncoder
在你的卡夫卡生產者,你需要註冊您的自定義編碼器類像
Properties properties = new Properties();
properties.put("metadata.broker.list", brokerList);
properties.put("serializer.class", "com.my.TestEncoder");
Producer<String, Test> producer = new Producer<String, Test>(new ProducerConfig(properties));
Test test = new Test();
KeyedMessage<String, Test> data = new KeyedMessage<String, Test>("myTopic", test);
producer.send(data);
感謝這有助於。 – Bankelaal