2016-03-15 80 views
0

我正在研究基於Spark的Kafka Consumer,它讀取Avro格式的數據。 以下是讀取和處理輸入的try catch代碼。任務不能在Spark中串行化讀取序列化輸入

import java.util.*; 
import java.io.*; 

import com.twitter.bijection.Injection; 
import com.twitter.bijection.avro.GenericAvroCodecs; 
import kafka.serializer.StringDecoder; 
import kafka.serializer.DefaultDecoder; 
import scala.Tuple2; 

import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericRecord; 

import kafka.producer.KeyedMessage; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.SparkConf; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.api.java.function.*; 
import org.apache.spark.streaming.api.java.*; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.apache.spark.streaming.Durations; 

public class myKafkaConsumer{ 
    /** 
    * Main function, entry point to the program. 
    * @param args, takes the user-ids as the parameters, which 
    *will be treated as topics 
    * in our case. 
    */ 
    private String [] topics; 
    private SparkConf sparkConf; 
    private JavaStreamingContext jssc; 

    public static final String USER_SCHEMA = "{" 
      + "\"type\":\"record\"," 
      + "\"name\":\"myrecord\"," 
      + "\"fields\":[" 
      + " { \"name\":\"str1\", \"type\":\"string\" }," 
      + " { \"name\":\"int1\", \"type\":\"int\" }" 
      + "]}"; 

    public static void main(String [] args){ 
    if(args.length < 1){ 
     System.err.println("Usage : myKafkaConsumber <topics/user-id>"); 
     System.exit(1); 
    } 
    myKafkaConsumer bKC = new myKafkaConsumer(args); 
    bKC.run(); 
} 

    /** 
    * Constructor 
    */ 
    private myKafkaConsumer(String [] topics){ 
    this.topics = topics; 
    sparkConf = new SparkConf(); 
    sparkConf = sparkConf.setAppName("JavaDirectKafkaFilterMessages"); 
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); 
    } 

    /** 
    * run function, runs the entire program. 
    * @param topics, a string array containing the topics to be read from 
    * @return void 
    */ 
    private void run(){ 
    HashSet<String> topicSet = new HashSet<String>(); 
    for(String topic : topics){ 
     topicSet.add(topic); 
     System.out.println(topic); 
    } 
    HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
    kafkaParams.put("metadata.broker.list", "128.208.244.3:9092"); 
    kafkaParams.put("auto.offset.reset", "smallest"); 
    try{ 
     JavaPairInputDStream<String, byte[]> messages = KafkaUtils.createDirectStream(
     jssc, 
     String.class, 
     byte[].class, 
     StringDecoder.class, 
    DefaultDecoder.class, 
    kafkaParams, 
    topicSet 
); 

    JavaDStream<String> avroRows = messages.map(new Function<Tuple2<String, byte[]>, String>(){ 
    public String call(Tuple2<String, byte[]> tuple2){ 
     return testFunction(tuple2._2().toString()); 
    } 
    }); 
    avroRows.print(); 
    jssc.start(); 
    jssc.awaitTermination(); 
}catch(Exception E){ 
    System.out.println(E.toString()); 
    E.printStackTrace(); 
    } 
} 

private static String testFunction(String str){ 
    System.out.println("Input String : " + str); 
    return "Success"; 
} 
} 

代碼編譯正確,但是,當我嘗試運行星火集羣上的代碼,我得到任務無法序列錯誤。我試圖刪除功能,只是打印一些文字,仍然錯誤仍然存​​在。

P.S.我檢查了打印信息並發現它們被正確讀取。

回答

0

打印語句將您的RDD收集到驅動程序中,以便將其打印在屏幕上。這樣的任務會觸發數據的序列化/反序列化。

爲了使您的代碼正常工作,avroRows Dstream中的記錄必須是可序列化的類型。

例如,它應該如果通過此更換avroRows定義工作:

JavaDStream<String> avroRows = messages.map(new Function<Tuple2<String, byte[]>, String>(){ 
    public String call(Tuple2<String, byte[]> tuple2){ 
     return tuple2._2().toString(); 
    } 
}); 

我只是增加了一個toString到您的記錄,因爲String類型是可序列化(當然,它並不一定是你需要,它只是一個例子)。

+0

我嘗試了你的建議並編輯了avroRows變量,但是錯誤仍然存​​在。 – Jayant

+0

你在哪裏定義函數'functionToProcessByteArray'?它是不可序列化類的方法嗎? –

+0

我創建了一個只使用String類的測試函數。所以,該方法使用可序列化的類。 'private static String testFunction(String str){System.out.println(「Input String:」+ str); return「Success」;' } – Jayant

相關問題