0

我試圖做使用兩個室壁運動流查詢如何在1個Spark Streaming應用程序中解析來自2個Kinesis流的數據?

select a.user_id , b.domain from realTimeTable_1 as a join realTimeTable_2 as b on a.device_id = b.device_id

。但是Stream2中缺少輸出,有人知道如何將兩個數據流同時加入或寫入hbase或Parquet嗎?這裏是我的代碼,我設置SparkConf().set("spark.streaming.concurrentJobs", "2")處理兩個流:

val numShards_s1 = kinesisClient.describeStream("stream1").getStreamDescription().getShards().size 
val numShards_s2 = kinesisClient.describeStream("stream2").getStreamDescription().getShards().size 
val numStreams_s1 = numShards_s1 
val numStreams_s2 = numShards_s2 
val batchInterval = Seconds(5) 
val kinesisClient = new AmazonKinesisClient(credentials)kinesisClient.setEndpoint("https://kinesis.us-east-1.amazonaws.com") 
val kinesisCheckpointInterval = batchInterva 
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() 
val ssc = new StreamingContext(sc, batchInterval) 
val kinesisStreams_s1 = (0 until numStreams_s1).map { i => 
    KinesisUtils.createStream(ssc, "stream-demo", "stream1", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) 
} 
    val kinesisStreams_s2 = (0 until numShards_s2).map { i => 
    KinesisUtils.createStream(ssc, "stream-demo", "stream2", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) 
} 
val unionStreams_s1 = ssc.union(kinesisStreams_s1) 
val unionStreams_s2 = ssc.union(kinesisStreams_s2) 
val schemaString_s1 = "user_id,device_id,action,timestamp 
val schemaString_s2= "device_id,domain,timestamp 
val tableSchema_s1 = StructType(schemaString_s1.split(",").map(fieldName => StructField(fieldName, StringType, true))) 
val tableSchema_s2 = StructType(schemaString_s2.split(",").map(fieldName => StructField(fieldName, StringType, true))) 

unionStreams_s1.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { 
    val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(","))) 
    val output1 = sqlContext.createDataFrame(rowRDD,tableSchema_s1) 
    output1.createOrReplaceTempView("realTimeTable_1")}) 

unionStreams_s2.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { 
    val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(","))) 
    val output2 = sqlContext.createDataFrame(rowRDD,tableSchema_s2) 
    output1.createOrReplaceTempView("realTimeTable_2")}) 

所以他們在理論上我應該能夠執行:

select a.user_id , b.domain from realTimeTable_1 as a join realTimeTable_2 as b on a.device_id = b.device_id 

然而,即使這樣做select * from realTimeTable_2不產生任何輸出,我想我的代碼缺少一些東西,請問任何人都可以找到缺少的邏輯嗎?

+0

我不能使用JVM兩個上下文按照火花流媒體文件之一:只有一個的StreamingContext可以在同一時間一個JVM是活動的。 –

回答

0

在拼接機器上,我們從來沒有試過雙流只有一個流,然後通過SQL加入持久數據。

我沒有看到流的開始?這裏的代碼與你的代碼非常相似,我希望它有幫助。

查看Spark主分支上的KinesisWordCountASL.scala。

這是短期的鏈接。

https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

+0

謝謝John,你有沒有解析過兩種不同的輸入模式?我已經看到了附加的代碼,不幸的是它將兩個流視爲單獨的碎片。我的問題是如何啓用複雜事件處理框架,以便可以同時分析兩個具有2 x不同的電子現場數據集。 –

+0

以ssc.start()開始流式傳輸 –

相關問題