0

我從那裏我讀出的數據如下卡夫卡隊列:如何將記錄從卡夫卡傳遞給方法?

private static void startKafkaConsumerStream() { 

     try { 

      System.out.println("Print method: startKafkaConsumerStream"); 

      Dataset<String> lines = (Dataset<String>) _spark 
        .readStream() 
        .format("kafka") 
        .option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers")) 
        .option("subscribe", HTTP_FED_VO_TOPIC) 
        .option("startingOffsets", "latest") 
        .load() 
        .selectExpr("CAST(value AS STRING)") 
        .as(Encoders.STRING()); 

      StreamingQuery query = lines.writeStream() 
        .outputMode("append") 
        .format("console") 
        .start(); 

      query.awaitTermination(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

的要求:與上面的代碼,我能打印記錄卻安慰,我受到損害,如怎麼辦我將這些傳遞給一個可以處理它們的方法。

要做到這一點,我試着在文檔中查找,但無法找到任何相關的。由於我是這個新手,這聽起來可能有點愚蠢。不過,我卡住了,並會高度讚賞任何提示。

目標應用該應用的目的的是接受請求,並傳送給卡夫卡,然後在單獨的線程卡夫卡讀取器被實現,其負責讀取並處理該請求,併產生輸出到另一個卡夫卡隊列。我只是在實施這個,架構不是我的想法。

+0

是否有明確的'append'輸出模式的特定原因?這是默認的輸出模式,因此我在問。 –

+0

不早一點,我想要完成的嘖嘖我只是對Kafka中可用的最新數據感興趣 – User3

+0

_「實現了一個負責讀取和處理請求的kafka閱讀器」_ < - 您在「處理請求」部分中究竟想做什麼? –

回答

1

您可以在卡夫卡流應用程序的吸收部使用ForeachWriter[T]處理您的查詢的每一行,像這樣:

datasetOfString.write.foreach(new ForeachWriter[String] { 

    def open(partitionId: Long, version: Long): Boolean = { 
     // open connection 
    } 

    def process(record: String) = { 
     // write string to connection 
    } 

    def close(errorOrNull: Throwable): Unit = { 
     // close the connection 
    } 
    }) 
+0

你是我的明星! – User3

+0

只爲未來的讀者提供一句話:'public abstract void process(T value) 調用來處理執行器端的數據。只有打開時纔會調用此方法返回true。# – User3

1

linesDataset<String>與卡夫卡的行中的值。

如何將這些傳遞給將處理它們的方法。

取決於正是你想要做的,當然你可以使用foreach經營者或使用其他運營商或工作,你可以使用批處理數據集的內容。

您可以使用withColumn(...)selectmap運算符。

換句話說,將Spark結構化流視爲具有流式數據集的Spark SQL。

+0

謝謝傑克,這是非常有洞察力的新功能,可以像海中那樣查找文檔。 – User3