2016-03-15 43 views
0

我收到有幾行,並在Java中使用該命令在星火流屬性一個JSON輸入:如何保持一個JSON結構,當我把它給星火流

JavaReceiverInputDStream <String> 
lines = ssc.socketTextStream 
(localhost, port) 

我現在想過濾線條流,以便它在每一行中只有兩個特定的屬性,並轉儲其餘的。

這裏的問題是,我注意到,行不留JSON結構,即,我不能只是做

JavaDstream<String> line=lines[1]; 
line.print() ; 

我的問題是,如何才能讓我的JavaDStream保持結構的JSON對象,然後打印我想要的行?

我希望我很清楚,謝謝。

回答

1

曼努埃爾,

所以基本上你問怎麼做才能確保整個有效載荷JSON是單個記錄在RDD或者是什麼,當你對套接字發送消息記錄邊界。

所以基本上你使用socketTextStream讀取socket上的消息,如果它發現一個新的行字符,它將使用它作爲記錄邊界。收聽插座,閱讀消息,並把它傳遞給星火實際的代碼)方法SocketReceiver.bytesToLines的一部分(如果你看一下評論,這是它說什麼

/** 
* This methods translates the data from an inputstream (say, from a socket) 
* to '\n' delimited strings and returns an iterator to access the strings. 
*/ 

所以一定要拿出\從您的發送火花單記錄

蘇尼爾

+0

親愛的蘇尼爾的JSON消息n字符, 謝謝您的回答。但它不完全是我想要的。我想要的是這樣的: 我有一個JSON文件,我發送到Spark Streaming使用socketTextStream,稱爲'線'。如果我想打印整個'行'dstream,我只是做lines.print,它的工作原理。 但是,如果我想打印一個特定的屬性值(例如第一行中的第一個屬性的值,[1,1]行,我不能。 我該怎麼做? –

+0

Basicaly我想要一個line對應於流中的一個RDD –

+0

如果您對將JSON文件轉換爲Stream感興趣,您可能需要使用SparkStreamingContext.textFileStream(directoryToMonitor)的sparkStream概念,它的工作方式是可以監視特定HDFS目錄中的文件當出現一個新的JSON文件時,Spark會將它轉換爲RDD,你可以在http://wpcertification.blogspot.com/2016/01/monitoring-hdfs-directory-for-new-files.html –