2017-04-14 44 views
1

如何使用Spark Streaming for Java API實現以下功能?如何從Kafka中讀取所有記錄(從開始到開始),然後停止StreamingContext?

  1. 閱讀所有從卡夫卡日誌壓實主題的郵件(我用它來存儲所有的用戶配置文件數據)的每條消息是單個用戶配置文件數據。
  2. 一旦所有數據消耗完畢,不要等待下一個數據進入kafka管道並停止流式上下文。

我非常新的火花流API,我不知道如何使火花流上下文停止等待更多的消息來和進度提前與它有任何的數據。

回答

0

您應該然後使用KafkaUtils.createRDD說:

使用每個主題和分區偏移範圍是從卡夫卡創建RDD。這允許您指定Kafka領導連接(優化獲取)並訪問消息以及元數據。

這將拉動所有的記錄按offsetRanges

+1

完美。這是我需要的。我會試試這個。 – user1699361