2016-12-01 65 views
0

我已經寫下面的代碼來連接火花流的kinesis,但沒有收到數據。無法讀取火花流連接中的數據Kinesis

VAL kinesisStream = KinesisUtils.createStream(SSC,APPNAME,streamName中,endpointUrl,regionName,InitialPositionInStream.LATEST,batchInterval,StorageLevel.MEMORY_AND_DISK_2)

kinesisStream.print() // nothing getting printed here 

val data = kinesisStream.flatMap(byteArray => new String(byteArray)) 

data.foreachRDD { rdd =>   
     println("data==" + rdd.collect().length) // no data here too 
     rdd.collect()//.saveAsTextFile("file:///home/myHome/Code/sample/somedata.txt");   
    } 

我試圖寫入到S3和文件系統,它按文件夾寫文件名,並在我看到的只有_SUCCESS文件是零字節。

順便說一下,我也能寫,以相同的室壁運動流,並從Java

什麼是這裏的問題讀取數據。

+0

您是否找到了解決方案? – ArunDhaJ

回答

0

我得到了這個問題的解決方案。

代碼可以從kinesis中提取數據。與數據一起,它還生成大量的零字節部分文件。因爲其流式應用程序數據部分文件正在爲給定時間間隔生成,因此如果數據在該時間間隔內不可用,則生成零字節文件。

添加檢查以刪除DF代碼中的空白部分文件,以便DF只能寫入包含數據的零件文件。

我們在這個變化之後開始獲取數據。