0

下我的數據幀模式我如何訪問嵌套字段數據幀.proto,ScalaPB

root 
|-- name: string (nullable = true) 
|-- addresses: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- street: string (nullable = true) 
| | |-- city: string (nullable = true) 

我要輸出名稱和城市。以下是我的火花流應用程序,它輸出名稱和地址,但我想輸出中的名稱和城市。 感謝您的幫助。謝謝。

object PersonConsumer { 
    import org.apache.spark.sql.{SQLContext, SparkSession} 
    import com.example.protos.demo._ 

    def main(args : Array[String]) { 

    val spark = SparkSession.builder. 
     master("local") 
     .appName("spark session example") 
     .getOrCreate() 

    import spark.implicits._ 

    val ds1 = spark.readStream.format("kafka"). 
     option("kafka.bootstrap.servers","localhost:9092"). 
     option("subscribe","person").load() 

    val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Person.parseFrom(_)).select($"name", $"addresses") 

    ds2.printSchema() 

    val query = ds2.writeStream 
     .outputMode("append") 
     .format("console") 
     .start() 

    query.awaitTermination() 
    } 
} 

回答

0

你可以簡單獲得名稱和城市的數據幀,然後你可以使用它,用於獲取名稱和城市的數據框,你可以選擇這兩個如下

ds1.select("name","addresses.element.city") 
0

由於桑迪普。選擇(「名稱」,「addresses.element.city」)給我錯誤,因爲地址是Seq [地址],我希望輸出中的所有城市。

最後我寫了下面的函數來獲取所有的城市..

def getCities(addresses: Seq[Address]) : String = { 
     var cities:String = "" 
     if (addresses.size > 0) { 
     cities = (for(a <- addresses) yield a.city.getOrElse("")).mkString(",") 
//  cities = addresses.foldLeft("")((str,addr) => str + addr.city.getOrElse("")) 
     } 
     cities 
    }