我從那裏我讀出的數據如下卡夫卡隊列:如何將記錄從卡夫卡傳遞給方法?
private static void startKafkaConsumerStream() {
try {
System.out.println("Print method: startKafkaConsumerStream");
Dataset<String> lines = (Dataset<String>) _spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
.option("subscribe", HTTP_FED_VO_TOPIC)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
StreamingQuery query = lines.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
的要求:與上面的代碼,我能打印記錄卻安慰,我受到損害,如怎麼辦我將這些傳遞給一個可以處理它們的方法。
要做到這一點,我試着在文檔中查找,但無法找到任何相關的。由於我是這個新手,這聽起來可能有點愚蠢。不過,我卡住了,並會高度讚賞任何提示。
目標應用該應用的目的的是接受請求,並傳送給卡夫卡,然後在單獨的線程卡夫卡讀取器被實現,其負責讀取並處理該請求,併產生輸出到另一個卡夫卡隊列。我只是在實施這個,架構不是我的想法。
是否有明確的'append'輸出模式的特定原因?這是默認的輸出模式,因此我在問。 –
不早一點,我想要完成的嘖嘖我只是對Kafka中可用的最新數據感興趣 – User3
_「實現了一個負責讀取和處理請求的kafka閱讀器」_ < - 您在「處理請求」部分中究竟想做什麼? –