2014-12-19 72 views
2

我創建了一個JSON數據,併爲它的Avro的模式:如何從HDFS中檢索Avro數據?

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 } 
{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 } 

{ 「類型」: 「記錄」, 「名」: 「twitter_schema」 「namespace」:「com.miguno.avro」,「fields」:[{「012」:「name」:「username」, 「type」:「string」, 「doc」:「 com「},{ 」name「:」tweet「, 」type「:」string「, 」do c「:」用戶的Twitter消息的內容「},{ 」name「:」timestamp「, 」type「:」long「, 」doc「:」Unix epoch time in seconds「}]」doc :」: 「用於存儲的Twitter消息的基本模式」}

我然後將其轉化爲阿夫羅如下:

java -jar ~/avro-tools-1.7.4.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro 

將文件上HDFS與此:

hadoop fs -copyFromLocal twitter.avro <path> 
做的時候

import org.apache.avro.generic.GenericRecord 
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} 
import org.apache.hadoop.io.NullWritable 

val path = "hdfs:///path/to/your/avro/folder" 
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path) 

但是:

avroRDD.first 

我面臨以下異常:

org.apache.spark

,然後在星火CLI出具的本人代碼。 SparkException:由於階段失敗導致作業中止: 階段7.0(TID 13)中的任務2.0具有不可序列化的結果: org.apache.avro.mapred.AvroWrapper at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala:1174) 在 org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.適用(DAGScheduler.scala:1173) 在 scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray .scala:59) 在scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

什麼是它的解決方案?

回答

3

Spark正在嘗試對您的avro數據進行ser/de,但它不是「java serializable」(spark中使用的默認服務)。

你有幾個選擇:

  • 提取物從包裝通用記錄每條記錄映射到某個序列化結構
  • 生成特定記錄類和DESER給他們,而不是通用的記錄(你仍然會需要提取從包裝備案)
  • 使KRYO序列化(這將在工作,某些情況下只有

請注意,記錄會在內部重複使用,因此如果您例如使用rdd.collect,則最終將記錄具有相同值的所有記錄。在進行收集之前將原始輸入數據映射到其他東西,以便在複製時解決問題。