2017-07-19 90 views
2

我想要做的只是從rdd中提取一些信息,並使用Spark(scala)將其放入數據框中。如何提取RDD內容並使用spark(scala)輸入DataFrame

到目前爲止,我所做的就是創建一個流管道,連接到卡夫卡的話題,並把話題的內容設置在RDD:

val kafkaParams = Map[String, Object](
     "bootstrap.servers" -> "localhost:9092", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "test", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 



    .outputMode("complete") 


    val topics = Array("vittorio") 
    val stream = KafkaUtils.createDirectStream[String, String](
     ssc, 
     PreferConsistent, 
     Subscribe[String, String](topics, kafkaParams) 
    ) 

    val row = stream.map(record => record.value) 
    row.foreachRDD { (rdd: RDD[String], time: Time) => 


     rdd.collect.foreach(println) 

     val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) 
     import spark.implicits._ 
     val DF = rdd.toDF() 

     DF.show() 
    } 

    ssc.start()    // Start the computation 
    ssc.awaitTermination() 

    } 

    object SparkSessionSingleton { 

    @transient private var instance: SparkSession = _ 

    def getInstance(sparkConf: SparkConf): SparkSession = { 
     if (instance == null) { 
     instance = SparkSession 
      .builder 
      .config(sparkConf) 
      .getOrCreate() 
     } 
     instance 
    } 
    } 

現在,我RDD的含量:

{"event":"bank.legal.patch","ts":"2017-04-15T15:18:32.469+02:00","svc":"dpbank.stage.tlc-1","request":{"ts":"2017-04-15T15:18:32.993+02:00","aw":"876e6d71-47c4-40f6-8c49-5dbd7b8e246b","end_point":"/bank/v1/legal/mxHr+bhbNqEwFvXGn4l6jQ==","method":"PATCH","app_instance":"e73e93d9-e70d-4873-8f98-b00c6fe4d036-1491406011","user_agent":"Dry/1.0.st/Android/5.0.1/Sam-SM-N910C","user_id":53,"user_ip":"151.14.81.82","username":"7cV0Y62Rud3MQ==","app_id":"db2ffeac6c087712530981e9871","app_name":"DrApp"},"operation":{"scope":"mdpapp","result":{"http_status":200}},"resource":{"object_id":"mxHr+bhbNqEwFvXGn4l6jQ==","request_attributes":{"legal_user":{"sharing_id":"mxHr+bhbNqEwFvXGn4l6jQ==","ndg":"","taxcode":"IQ7hUUphxFBXnI0u2fxuCg==","status":"INCOMPLETE","residence":{"city":"CAA","address":"Via Batto 44","zipcode":"926","country_id":18,"city_id":122},"business_categories":[5],"company_name":"4Gzb+KJk1XAQ==","vat_number":"162340159"}},"response_attributes":{"legal_user":{"sharing_id":"mGn4l6jQ==","taxcode":"IQ7hFBXnI0u2fxuCg==","status":"INCOMPLETE","residence":{"city":"CATA","address":"Via Bllo 44","zipcode":"95126","country_id":128,"city_id":12203},"business_categories":[5],"company_name":"4GnU/Nczb+KJk1XAQ==","vat_number":"12960159"}}},"class":"DPAPI"} 

,做val DF = rdd.toDF()正顯示出:

+--------------------+ 
|    value| 
+--------------------+ 
|{"event":"bank.le...| 
+--------------------+ 

我會如T o實現是一個數據幀,將隨着新的RDD從流中到達而被填充。一種union方法,但不知道如果是正確的方式,因爲我不確定所有的rdds將具有相同的模式。

例如,這是我想達到的目標:

+--------------------+------------+----------+-----+ 
|     _id|  user_ip| status|_type| 
+--------------------+------------+----------+-----+ 
|AVtJFVOUVxUyIIcAklfZ|151.14.81.82|INCOMPLETE|DPAPI| 
|AVtJFVOUVxUyIIcAklfZ|151.14.81.82|INCOMPLETE|DPAPI| 
+--------------------+------------+----------+-----+ 

的感謝!

回答

0

如果您的RDD是

{"event":"bank.legal.patch","ts":"2017-04-15T15:18:32.469+02:00","svc":"dpbank.stage.tlc-1","request":{"ts":"2017-04-15T15:18:32.993+02:00","aw":"876e6d71-47c4-40f6-8c49-5dbd7b8e246b","end_point":"/bank/v1/legal/mxHr+bhbNqEwFvXGn4l6jQ==","method":"PATCH","app_instance":"e73e93d9-e70d-4873-8f98-b00c6fe4d036-1491406011","user_agent":"Dry/1.0.st/Android/5.0.1/Sam-SM-N910C","user_id":53,"user_ip":"151.14.81.82","username":"7cV0Y62Rud3MQ==","app_id":"db2ffeac6c087712530981e9871","app_name":"DrApp"},"operation":{"scope":"mdpapp","result":{"http_status":200}},"resource":{"object_id":"mxHr+bhbNqEwFvXGn4l6jQ==","request_attributes":{"legal_user":{"sharing_id":"mxHr+bhbNqEwFvXGn4l6jQ==","ndg":"","taxcode":"IQ7hUUphxFBXnI0u2fxuCg==","status":"INCOMPLETE","residence":{"city":"CAA","address":"Via Batto 44","zipcode":"926","country_id":18,"city_id":122},"business_categories":[5],"company_name":"4Gzb+KJk1XAQ==","vat_number":"162340159"}},"response_attributes":{"legal_user":{"sharing_id":"mGn4l6jQ==","taxcode":"IQ7hFBXnI0u2fxuCg==","status":"INCOMPLETE","residence":{"city":"CATA","address":"Via Bllo 44","zipcode":"95126","country_id":128,"city_id":12203},"business_categories":[5],"company_name":"4GnU/Nczb+KJk1XAQ==","vat_number":"12960159"}}},"class":"DPAPI"} 

然後你可以使用sqlContextread.json閱讀rdd爲有效dataframe然後select唯一需要的領域

val df = sqlContext.read.json(sc.parallelize(rdd)) 

df.select($"request.user_id"as("user_id"), 
      $"request.user_ip"as("user_ip"), 
      $"request.app_id"as("app_id"), 
      $"resource.request_attributes.legal_user.status"as("status"), 
      $"class") 
    .show(false) 

這將導致以下dataframe

+-------+------------+---------------------------+----------+-----+ 
|user_id|user_ip  |app_id      |status |class| 
+-------+------------+---------------------------+----------+-----+ 
|53  |151.14.81.82|db2ffeac6c087712530981e9871|INCOMPLETE|DPAPI| 
+-------+------------+---------------------------+----------+-----+ 

您可以根據需要使用上述方法獲得所需的columns。我希望答案是有幫助的

0

您可以聯盟當前數據幀與現有的一個:

首先,建立在程序開始空數據框:

val df = // here create DF with required schema 
df.createOrReplaceTempView("savedDF") 

現在foreachRDD:

// here we are in foreachRDD 
val df = // create DataFrame from RDD 
val existingCachedDF = spark.table("savedDF") // get reference to existing DataFrame 
val union = existingCachedDF.union(df) 
union.createOrReplaceTempView("savedDF") 

好主意將是檢查點DataFrame在一些微型表格中減少非常長的DataFrame邏輯計劃

另一個想法是使用結構化流式傳輸,它將取代Spark Streaming

相關問題