2017-04-21 67 views
0

我有這樣一段代碼:試圖瞭解火花流流

val lines: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topics) 
    lines.foreachRDD { rdd => 
     val df = cassandraSQLContext.read.json(rdd.map(x => x._2)) 
     sparkStreamingService.run(df) 
    } 
    ssc.start() 
    ssc.awaitTermination() 

我的理解是,foreachRDD在驅動程序級別發生的呢?所以基本上所有的代碼塊:

lines.foreachRDD { rdd => 
    val df = cassandraSQLContext.read.json(rdd.map(x => x._2)) 
    sparkStreamingService.run(df) 
} 

發生在驅動程序級別?所述sparkStreamingService.run(DF)方法基本上沒有對電流數據幀一些轉換,以產生一個新的數據幀,然後調用存儲數據幀到另一個卡桑德拉方法(在另一個罐)。 因此,如果這種情況都發生在驅動程序級別,我們沒有利用火花執行程序,我怎樣才能做到這一點,以便並行使用執行程序來並行處理RDD的每個分區

我的火花流服務運行方法:

var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect() 
metadataDataframe.foreach(rowD => { 
     metaData = populateMetaDataService.populateSiteMetaData(rowD) 
     val headers = (rowD.getString(2).split(recordDelimiter)(0)) 

     val fields = headers.split("\u0001").map(
     fieldName => StructField(fieldName, StringType, nullable = true)) 
     val schema = StructType(fields) 

     val listOfRawData = rowD.getString(2).indexOf(recordDelimiter) 
     val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1) 

     val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter)) 
//  val rawData = dataWithoutHeaders.split(recordDelimiter) 
     val rowRDD = rawData 
     .map(_.split("\u0001")) 
     .map(attributes => Row(attributes: _*)) 

     val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema) 
     dataFrameFilterService.processBasedOnOpType(metaData, newDF) 
    }) 

回答

1

foreachRDD可以在本地運行,但只是意味着設置。 RDD本身是一個分佈式集合,所以實際的工作是分佈式的。

直接在代碼的文檔註釋:

dstream.foreachRDD { rdd => 
    val connection = createNewConnection() // executed at the driver 
    rdd.foreach { record => 
    connection.send(record) // executed at the worker 
    } 
} 

請注意,這不是圍繞RDD基於代碼的部分在驅動程序執行。這是使用分發給工作人員的RDD建立的代碼。

你的代碼將專門被註釋如下:

//df.select will be distributed, but collect will pull it all back in 
    var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect() 
//Since collect created a local collection then this is done on the driver 
metadataDataframe.foreach(rowD => { 
     metaData = populateMetaDataService.populateSiteMetaData(rowD) 
     val headers = (rowD.getString(2).split(recordDelimiter)(0)) 

     val fields = headers.split("\u0001").map(
     fieldName => StructField(fieldName, StringType, nullable = true)) 
     val schema = StructType(fields) 

     val listOfRawData = rowD.getString(2).indexOf(recordDelimiter) 
     val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1) 

     //This will run locally, creating a distributed record 
     val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter)) 
//  val rawData = dataWithoutHeaders.split(recordDelimiter) 
     //This will redistribute the work 
     val rowRDD = rawData 
     .map(_.split("\u0001")) 
     .map(attributes => Row(attributes: _*)) 
     //again, setting this up locally, to be run distributed 
     val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema) 
     dataFrameFilterService.processBasedOnOpType(metaData, newDF) 
    }) 

最終,你可能可以重寫這並不需要收集和保存所有分佈的,但這是你沒有計算器

+0

但在這裏:http://spark.apache.org/docs/latest/streaming-programming-guide.html,如果你向下滾動到他們使用forEachRdd的地方,他們有一個評論說,一個特定的聲明正在以@Ahmed司機 – Ahmed

+0

執行我編輯它直接與比文檔 –

+0

是,這樣是我的問題增加了更多的明確性解決這一問題,就收集。到現在爲止,由於我正在收集,記錄是按順序處理的,並且這些記錄中的每一個都會分發給執行者?然後刪除收集,所有的記錄將被並行處理而不是順序處理,是嗎? – Ahmed

1

的調用foreachRDD確實發生在驅動程序節點上。但是,由於我們正在RDD級別運營,因此將對其進行分配。在你的例子中,rdd.map將導致每個分區被髮送到特定的工作節點進行計算。

因爲我們不知道你的sparkStreamingService.run方法是幹什麼的,我們不能告訴你關於其執行的地方。

+0

我添加了run方法的代碼。這是最有效的方法嗎?此方法被並行右:dataFrameFilterService.processBasedOnOpType(元數據,newDF)。我理解它的方式是我可以避免使用collect來加速進程,以便並行處理記錄? – Ahmed