2016-03-09 87 views
2

我試圖讓使用KafkaRDD運行例如:KafkaRDD斯卡拉小例子

val kafkaParams = Map("metadata.broker.list" -> "localhost:9092") 
val offsetRanges = Array(
    OffsetRange("topic", 0, 0, 2) 
) 
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](sc, kafkaParams, offsetRanges) 
rdd.map(x => println(x)).collect() 

res: Array[Unit] = Array((),()) 

我一直在與單個分區創建「主題」,寫2級的消息,你好,世界慎重。

我能得到什麼看起來像一個正確的RDD,但我怎麼能訪問其內容?我錯過了什麼嗎?

感謝,E.

回答

3

問題是這條線,我相信:

rdd.map(x => println(x)).collect() 

的RDD的工作方式,rdd.map上運行的執行程序。當你println它將它打印到執行者stdout。要將其打印到驅動程序應用程序中的stdout,請嘗試使用此方法:

rdd.collect().map(x => println(x))