0
我正在研究基於Spark的Kafka Consumer,它讀取Avro格式的數據。 以下是讀取和處理輸入的try catch代碼。任務不能在Spark中串行化讀取序列化輸入
import java.util.*;
import java.io.*;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import kafka.serializer.StringDecoder;
import kafka.serializer.DefaultDecoder;
import scala.Tuple2;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import kafka.producer.KeyedMessage;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
public class myKafkaConsumer{
/**
* Main function, entry point to the program.
* @param args, takes the user-ids as the parameters, which
*will be treated as topics
* in our case.
*/
private String [] topics;
private SparkConf sparkConf;
private JavaStreamingContext jssc;
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public static void main(String [] args){
if(args.length < 1){
System.err.println("Usage : myKafkaConsumber <topics/user-id>");
System.exit(1);
}
myKafkaConsumer bKC = new myKafkaConsumer(args);
bKC.run();
}
/**
* Constructor
*/
private myKafkaConsumer(String [] topics){
this.topics = topics;
sparkConf = new SparkConf();
sparkConf = sparkConf.setAppName("JavaDirectKafkaFilterMessages");
jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
}
/**
* run function, runs the entire program.
* @param topics, a string array containing the topics to be read from
* @return void
*/
private void run(){
HashSet<String> topicSet = new HashSet<String>();
for(String topic : topics){
topicSet.add(topic);
System.out.println(topic);
}
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "128.208.244.3:9092");
kafkaParams.put("auto.offset.reset", "smallest");
try{
JavaPairInputDStream<String, byte[]> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
byte[].class,
StringDecoder.class,
DefaultDecoder.class,
kafkaParams,
topicSet
);
JavaDStream<String> avroRows = messages.map(new Function<Tuple2<String, byte[]>, String>(){
public String call(Tuple2<String, byte[]> tuple2){
return testFunction(tuple2._2().toString());
}
});
avroRows.print();
jssc.start();
jssc.awaitTermination();
}catch(Exception E){
System.out.println(E.toString());
E.printStackTrace();
}
}
private static String testFunction(String str){
System.out.println("Input String : " + str);
return "Success";
}
}
代碼編譯正確,但是,當我嘗試運行星火集羣上的代碼,我得到任務無法序列錯誤。我試圖刪除功能,只是打印一些文字,仍然錯誤仍然存在。
P.S.我檢查了打印信息並發現它們被正確讀取。
我嘗試了你的建議並編輯了avroRows變量,但是錯誤仍然存在。 – Jayant
你在哪裏定義函數'functionToProcessByteArray'?它是不可序列化類的方法嗎? –
我創建了一個只使用String類的測試函數。所以,該方法使用可序列化的類。 'private static String testFunction(String str){System.out.println(「Input String:」+ str); return「Success」;' } – Jayant