2015-07-21 88 views
1

我有一個卡夫卡生產者,其從目錄中讀取和KafkaUtils.createDirectStream一個String對象星火

def main(args: Array[String]) { 
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = Array("quickstart.cloudera:9092", "test","10","10") 
val directoryPath = "/home/cloudera/Documents/config/" 
// Zookeeper connection properties 
val props = new HashMap[String, Object]() 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer") 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer") 

val producer = new KafkaProducer[String, String](props) 

val myDirectory= new File(directoryPath) 
var lines ="" 
for (file <- myDirectory.listFiles) { 
    lines = scala.io.Source.fromFile(file).mkString 

    val message = new ProducerRecord[String, String](topic, null, lines) 
    producer.send(message) 
    print(lines) 
    Thread.sleep(1000) 
} 

同樣我使用的火花直接流爲我的消費文件的內容寫入到主題

val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2) 

val str = lines.print(10) 

我可以打印文件的內容。 我正在使用單個主題。 我必須從此DStream中獲取RDD,並將整個內容轉換爲字符串對象,以便將其傳遞給方法。 有人可以幫忙嗎?

回答

5

你正在尋找的API是:

DStream.foreachRDD(func) 

它適用的功能,FUNC,從流生成的每個RDD。 所以,爲您的使用情況下,我可能會寫下面的代碼:

lines.foreachRDD(rdd => { 
    val data = rdd.collect().mkString("\n") 
    println(data) 
}) 

請注意,由於這些代碼在驅動程序運行時,你必須確保它有足夠的資源來處理指定的文件。 通常,應該使用此API將每個RDD中的數據推送到外部系統,例如將RDD保存到文件,或通過網絡將其寫入數據庫。

您可以進一步閱讀關於Spark's programming guide上DStreams的其他輸出操作。