2015-11-01 58 views
0

我已經編寫了這個示例程序來將RDD保存到avro文件中。堅持RDD爲Avro文件

我使用CDH 5.4星火1.3

我寫了這AVSC文件,然後生成的代碼級用戶

{"namespace": "com.abhi", 
"type": "record", 
"name": "User", 
"fields": [ 
    {"name": "firstname", "type": "string"}, 
    {"name": "lastname", "type": "string"} ] 
} 

於是我產生了用戶的代碼

java -jar ~/Downloads/avro-tools-1.7.7.jar compile schema User.avsc . 

的我寫了我的例子

package com.abhi 

import org.apache.hadoop.mapreduce.Job 
import org.apache.spark.SparkConf 
import org.apache.avro.generic.GenericRecord 
import org.apache.avro.mapred.AvroKey 
import org.apache.avro.mapreduce.{AvroKeyOutputFormat, AvroJob, AvroKeyInputFormat} 
import org.apache.hadoop.io.NullWritable 
import org.apache.spark.SparkContext 

object MySpark { 
    def main(args : Array[String]) : Unit = { 
    val sf = new SparkConf() 
     .setMaster("local[2]") 
     .setAppName("MySpark") 
    val sc = new SparkContext(sf) 

    val user1 = new User(); 
    user1.setFirstname("Test1"); 
    user1.setLastname("Test2"); 

    val user2 = new User("Test3", "Test4"); 

    // Construct via builder 
    val user3 = User.newBuilder() 
     .setFirstname("Test5") 
     .setLastname("Test6") 
     .build() 

    val list = Array(user1, user2, user3) 
    val userRdd = sc.parallelize(list) 

    val job: Job = Job.getInstance() 
    AvroJob.setOutputKeySchema(job, user1.getSchema) 

    val output = "/user/cloudera/users.avro" 
    userRdd.map(row => (new AvroKey(row), NullWritable.get())) 
     .saveAsNewAPIHadoopFile(
     output, 
     classOf[AvroKey[User]], 
     classOf[NullWritable], 
     classOf[AvroKeyOutputFormat[User]], 
     job.getConfiguration) 
    } 
} 

我有兩個問題與此代碼

有些進口是從舊的MapReduce API,我不知道爲什麼他們需要星火代碼

import org.apache.hadoop.mapreduce.Job 
import org.apache.avro.mapred.AvroKey 
import org.apache.avro.mapreduce.{AvroKeyOutputFormat, AvroJob, 
AvroKeyInputFormat} 

的代碼拋出當我把它提交給一個異常Hadoop集羣 它並創建名爲HDFS的/user/cloudera/users.avro空目錄

15/11/01 08:20:42 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir 
15/11/01 08:20:42 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 
15/11/01 08:20:42 INFO spark.SparkContext: Starting job: saveAsNewAPIHadoopFile at MySpark.scala:52 
15/11/01 08:20:42 INFO scheduler.DAGScheduler: Got job 1 (saveAsNewAPIHadoopFile at MySpark.scala:52) with 2 output partitions (allowLocal=false) 
15/11/01 08:20:42 INFO scheduler.DAGScheduler: Final stage: Stage 1(saveAsNewAPIHadoopFile at MySpark.scala:52) 
15/11/01 08:20:42 INFO scheduler.DAGScheduler: Parents of final stage: List() 
15/11/01 08:20:42 INFO scheduler.DAGScheduler: Missing parents: List() 
15/11/01 08:20:42 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[2] at map at MySpark.scala:51), which has no missing parents 
15/11/01 08:20:42 INFO storage.MemoryStore: ensureFreeSpace(66904) called with curMem=301745, maxMem=280248975 
15/11/01 08:20:42 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 65.3 KB, free 266.9 MB) 
15/11/01 08:20:42 INFO storage.MemoryStore: ensureFreeSpace(23066) called with curMem=368649, maxMem=280248975 
15/11/01 08:20:42 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 22.5 KB, free 266.9 MB) 
15/11/01 08:20:42 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:34630 (size: 22.5 KB, free: 267.2 MB) 
15/11/01 08:20:42 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0 
15/11/01 08:20:42 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839 
15/11/01 08:20:42 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (MapPartitionsRDD[2] at map at MySpark.scala:51) 
15/11/01 08:20:42 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 
15/11/01 08:20:42 ERROR scheduler.TaskSetManager: Failed to serialize task 1, not attempting to retry it. 
java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240) 
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150) 
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) 
    at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) 
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:149) 
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:464) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:232) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:227) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:296) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:294) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
+0

從您的代碼中缺少用戶類定義。如果你提供它會很有用。只是爲了使用[spark-avro](https://github.com/databricks/spark-avro)的記錄可能是一個更好的主意。 – zero323

+0

用戶使用avro工具自動生成。我在上面的問題中添加了avsc文件的定義以顯示User的定義。 –

回答

0

問題是,星火不能連載你User類,嘗試設置註冊一個KryoConfigurator並在那裏註冊你的課程。