嗨,我是Storm和Kafka的新手。 我使用風暴1.0.1和kafka 0.10.0 我們有一個kafkaspout可以從kafka主題接收java bean。 我已經花了幾個小時挖掘找到正確的方法。 發現了很多有用的文章,但到目前爲止,還沒有任何方法對我有用。來自kafka的java bean的Storm Kafkaspout KryoSerialization問題主題
以下是我的代碼:
StormTopology:
public class StormTopology {
public static void main(String[] args) throws Exception {
//Topo test /zkroot test
if (args.length == 4) {
System.out.println("started");
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2],
args[3]);
kafkaConf1.zkRoot = args[2];
kafkaConf1.useStartOffsetTimeIfOffsetOutOfRange = true;
kafkaConf1.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
kafkaConf1.scheme = new SchemeAsMultiScheme(new KryoScheme());
KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1);
System.out.println("started");
ShuffleBolt shuffleBolt = new ShuffleBolt(args[1]);
AnalysisBolt analysisBolt = new AnalysisBolt(args[1]);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("kafkaspout", kafkaSpout1, 1);
//builder.setBolt("counterbolt2", countbolt2, 3).shuffleGrouping("kafkaspout");
//This is for field grouping in bolt we need two bolt for field grouping or it wont work
topologyBuilder.setBolt("shuffleBolt", shuffleBolt, 3).shuffleGrouping("kafkaspout");
topologyBuilder.setBolt("analysisBolt", analysisBolt, 5).fieldsGrouping("shuffleBolt", new Fields("trip"));
Config config = new Config();
config.registerSerialization(VehicleTrip.class, VehicleTripKyroSerializer.class);
config.setDebug(true);
config.setNumWorkers(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(args[0], config, topologyBuilder.createTopology());
// StormSubmitter.submitTopology(args[0], config,
// builder.createTopology());
} else {
System.out
.println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
}
}
}
我使用KRYO
KafkaProducer序列化在卡夫卡的數據:
public class StreamKafkaProducer {
private static Producer producer;
private final Properties props = new Properties();
private static final StreamKafkaProducer KAFKA_PRODUCER = new StreamKafkaProducer();
private StreamKafkaProducer(){
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.abc.serializer.MySerializer");
producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}
public static StreamKafkaProducer getStreamKafkaProducer(){
return KAFKA_PRODUCER;
}
public void produce(String topic, VehicleTrip vehicleTrip){
ProducerRecord<String,VehicleTrip> producerRecord = new ProducerRecord<>(topic,vehicleTrip);
producer.send(producerRecord);
//producer.close();
}
public static void closeProducer(){
producer.close();
}
}
凱洛串行:
public class DataKyroSerializer extends Serializer<Data> implements Serializable {
@Override
public void write(Kryo kryo, Output output, VehicleTrip vehicleTrip) {
output.writeLong(data.getStartedOn().getTime());
output.writeLong(data.getEndedOn().getTime());
}
@Override
public Data read(Kryo kryo, Input input, Class<VehicleTrip> aClass) {
Data data = new Data();
data.setStartedOn(new Date(input.readLong()));
data.setEndedOn(new Date(input.readLong()));
return data;
}
我需要把數據傳回數據的bean。
按照幾篇文章我需要用一個自定義方案提供,使其拓撲結構的一部分,但直到現在我也沒有運氣
規範博爾特和方案
方案:
public class KryoScheme implements Scheme {
private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.addDefaultSerializer(Data.class, new DataKyroSerializer());
return kryo;
};
};
@Override
public List<Object> deserialize(ByteBuffer ser) {
return Utils.tuple(kryos.get().readObject(new ByteBufferInput(ser.array()), Data.class));
}
@Override
public Fields getOutputFields() {
return new Fields("data");
}
}
和螺栓:
public class AnalysisBolt implements IBasicBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
private String topicname = null;
public AnalysisBolt(String topicname) {
this.topicname = topicname;
}
public void prepare(Map stormConf, TopologyContext topologyContext) {
System.out.println("prepare");
}
public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println("execute");
Fields fields = input.getFields();
try {
JSONObject eventJson = (JSONObject) JSONSerializer.toJSON((String) input
.getValueByField(fields.get(1)));
String StartTime = (String) eventJson.get("startedOn");
String EndTime = (String) eventJson.get("endedOn");
String Oid = (String) eventJson.get("_id");
int V_id = (Integer) eventJson.get("vehicleId");
//call method getEventForVehicleWithinTime(Long vehicleId, Date startTime, Date endTime)
System.out.println("==========="+Oid+"| "+V_id+"| "+StartTime+"| "+EndTime);
} catch (Exception e) {
e.printStackTrace();
}
}
但如果我提交風暴拓撲我越來越錯誤:
java.lang.IllegalStateException: Spout 'kafkaspout' contains a
non-serializable field of type com.abc.topology.KryoScheme$1, which
was instantiated prior to topology creation.
com.minda.iconnect.topology.KryoScheme$1 should be instantiated within
the prepare method of 'kafkaspout at the earliest.
欣賞幫助調試問題並引導到正確的路徑。
由於