2016-07-19 79 views
0

Issue:物件不可序列化Spark Avro實木複合地板書寫器

請問您可以看看如何解決這個問題。能夠像正確打印一樣正確讀取它。但在寫入記錄實木複合地板越來越

對象不能序列

所致:java.io.NotSerializableException: parquet.avro.AvroParquetWriter序列化堆棧: - 對象不是 序列化(類:parquet.avro .AvroParquetWriter,值: [email protected]

請仔細閱讀,並讓我知道什麼是做到這一點的最好辦法。

代碼:Coverting的Avro記錄鑲木地板

val records = sc.newAPIHadoopRDD(conf.getConfiguration, 
    classOf[AvroKeyInputFormat[GenericRecord]], 
    classOf[AvroKey[GenericRecord]], //Transforms the PairRDD to RDD 
    classOf[NullWritable]).map(x => x._1.datum) 

    // Build a schema 
    val schema = SchemaBuilder 
    .record("x").namespace("x") 
    .fields 
    .name("x").`type`().stringType().noDefault() 
    .endRecord 

val parquetWriter = new AvroParquetWriter[GenericRecord](new Path(outPath), schema) 

val parquet = new GenericRecordBuilder(schema) 

records.foreach { keyVal => 
    val x = keyVal._1.datum().get("xyz") -- Field 
    parquet.set("x", x) 
     .build 
     parquetWriter.write(schema.build()) 
    } 

回答

0

我不知道爲什麼你正在服用你的方法。但我會推薦一種不同的方法。如果你把avro文件變成rdd,看起來就像你一樣。您可以創建一個模式,然後將RDD轉換爲數據幀,然後將數據幀寫入拼塊。

var avroDF = sqlContext.createDataFrame(avroRDD,avroSchema) 
avroDF 
    .write 
    .mode(SaveMode.Overwrite) 
    .parquet("parquet directory to write file") 
+0

謝謝你的方法。但問題是這是數組,列表,地圖的嵌套結構。非常大的嵌套avro。所以扁平化需要遍歷所有的元素,並得到所需的任何東西。 – Ankur

+0

如果您提出並接受了其中一個答案,那將會很好。我回答了你問的每個問題。 @Ankur – mark

0

對於我複雜的結構和數組複雜的Json我使用hive ql橫向視圖爆炸。這是一個扁平的複雜json的例子。它開始爲10行,對於一些痕跡我可以得到60行,有些我得到的不到5個。它只取決於它如何爆炸。

val tenj = sqlContext.read.json("file:///home/marksmith/hive/Tenfile.json") 

scala> tenj.printSchema 
root 

|-- DDIVersion: string (nullable = true) 
|-- EndTimestamp: string (nullable = true) 
|-- Stalls: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- Stall: long (nullable = true) 
| | |-- StallType: string (nullable = true) 
| | |-- TraceTypes: struct (nullable = true) 
| | | |-- ActiveTicket: struct (nullable = true) 
| | | | |-- Category: string (nullable = true) 
| | | | |-- Traces: array (nullable = true) 
| | | | | |-- element: struct (containsNull = true) 
| | | | | | |-- EndTime: string (nullable = true) 
| | | | | | |-- ID: string (nullable = true) 
| | | | | | |-- Source: string (nullable = true) 
| | | | | | |-- StartPayload: struct (nullable = true) 
| | | | | | | |-- SubticketID: string (nullable = true) 
| | | | | | | |-- TicketID: string (nullable = true) 
| | | | | | | |-- TicketState: long (nullable = true) 
| | | | | | |-- StartTime: string (nullable = true) 

tenj.registerTempTable("ddis") 


val sat = sqlContext.sql(
    "select DDIVersion, StallsExp.stall, StallsExp.StallType, at.EndTime, at.ID, 
     at.Source, at.StartPayload.SubTicketId, at.StartPayload.TicketID, 
     at.StartPayload.TicketState, at.StartTime 
    from ddis 
     lateral view explode(Stalls) st as StallsExp 
     lateral view explode(StallsExp.TraceTypes.ActiveTicket.Traces) at1 as at") 
sat: org.apache.spark.sql.DataFrame = [DDIVersion: string, stall: bigint, StallType: string, EndTime: string, ID: string, Source: string, SubTicketId: string, TicketID: string, TicketState: bigint, StartTime: string] 

sat.count 
res22: Long = 10 

sat.show 
+----------+-----+---------+--------------------+---+------+-----------+--------+-----------+--------------------+ 
|DDIVersion|stall|StallType|    EndTime| ID|Source|SubTicketId|TicketID|TicketState|   StartTime| 
+----------+-----+---------+--------------------+---+------+-----------+--------+-----------+--------------------+ 
| 5.3.1.11| 15| POPS4|2016-06-08T20:07:...| | STALL|   0|  777|   1|2016-06-08T20:07:...| 
| 5.3.1.11| 14| POPS4|2016-06-08T20:07:...| | STALL|   0|  384|   1|2016-06-08T20:06:...| 
| 5.3.1.11| 13| POPS4|2016-06-08T20:07:...| | STALL|   0| 135792|   1|2016-06-08T20:06:...| 
| 5.0.0.28| 26| POPS4|2016-06-08T20:06:...| | STALL|   0|  774|   2|2016-06-08T20:03:...| 
+0

謝謝馬克。你能否提供一種方式來讀取嵌套avro並獲取一些特定列並將其轉儲爲Parquet格式... – Ankur

1

,你可以從這裏開始的Avro的讀入數據幀一 https://github.com/databricks/spark-avro

// import needed for the .avro method to be added 
import com.databricks.spark.avro._ 

val sqlContext = new SQLContext(sc) 

// The Avro records get converted to Spark typesca 
val df = sqlContext.read.avro("src/test/resources/episodes.avro") 

df.registerTempTable("tempTable") 
val sat = sqlContext.sql(//use lateral view explode) 
sat.write.parquet("/tmp/output") 
+0

//使用橫向視圖爆炸的目的是什麼?爲什麼這需要? –

+0

你有一列是三件事的數組。通過使用橫向視圖爆炸,您可以將該行弄平,但它將是三行。除了數組的列以外,所有列都是相同的。這將有三個不同的值。 – mark

+0

爲什麼不單獨「爆炸」(沒有側視圖)?我知道我可以單獨使用「爆炸」來做到這一點,並一直想知道爲什麼要使用側面視圖。你也可以使用'Dataset.flatMap'。 –