2016-07-13 26 views
1

嗨,我是Storm和Kafka的新手。 我使用風暴1.0.1和kafka 0.10.0 我們有一個kafkaspout可以從kafka主題接收java bean。 我已經花了幾個小時挖掘找到正確的方法。 發現了很多有用的文章,但到目前爲止,還沒有任何方法對我有用。來自kafka的java bean的Storm Kafkaspout KryoSerialization問題主題

以下是我的代碼:

StormTopology:

public class StormTopology { 

public static void main(String[] args) throws Exception { 
    //Topo test /zkroot test 
    if (args.length == 4) { 
     System.out.println("started"); 
     BrokerHosts hosts = new ZkHosts("localhost:2181"); 

     SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2], 
       args[3]); 

     kafkaConf1.zkRoot = args[2]; 
     kafkaConf1.useStartOffsetTimeIfOffsetOutOfRange = true; 
     kafkaConf1.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); 
     kafkaConf1.scheme = new SchemeAsMultiScheme(new KryoScheme()); 
     KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1); 

     System.out.println("started"); 

     ShuffleBolt shuffleBolt = new ShuffleBolt(args[1]); 
     AnalysisBolt analysisBolt = new AnalysisBolt(args[1]); 
     TopologyBuilder topologyBuilder = new TopologyBuilder(); 
     topologyBuilder.setSpout("kafkaspout", kafkaSpout1, 1); 
     //builder.setBolt("counterbolt2", countbolt2, 3).shuffleGrouping("kafkaspout"); 
     //This is for field grouping in bolt we need two bolt for field grouping or it wont work 
     topologyBuilder.setBolt("shuffleBolt", shuffleBolt, 3).shuffleGrouping("kafkaspout"); 
     topologyBuilder.setBolt("analysisBolt", analysisBolt, 5).fieldsGrouping("shuffleBolt", new Fields("trip")); 
     Config config = new Config(); 
     config.registerSerialization(VehicleTrip.class, VehicleTripKyroSerializer.class); 
     config.setDebug(true); 
     config.setNumWorkers(1); 

     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology(args[0], config, topologyBuilder.createTopology()); 

     // StormSubmitter.submitTopology(args[0], config, 
     // builder.createTopology()); 

    } else { 
     System.out 
       .println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID"); 
    } 
} 

}

我使用KRYO

KafkaProducer序列化在卡夫卡的數據:

public class StreamKafkaProducer { 

private static Producer producer; 
private final Properties props = new Properties(); 
private static final StreamKafkaProducer KAFKA_PRODUCER = new StreamKafkaProducer(); 

private StreamKafkaProducer(){ 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("acks", "all"); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "com.abc.serializer.MySerializer"); 
    producer = new org.apache.kafka.clients.producer.KafkaProducer(props); 
} 

public static StreamKafkaProducer getStreamKafkaProducer(){ 
    return KAFKA_PRODUCER; 
} 

public void produce(String topic, VehicleTrip vehicleTrip){ 
    ProducerRecord<String,VehicleTrip> producerRecord = new ProducerRecord<>(topic,vehicleTrip); 
    producer.send(producerRecord); 
    //producer.close(); 
} 

public static void closeProducer(){ 
    producer.close(); 
} 

}

凱洛串行:

public class DataKyroSerializer extends Serializer<Data> implements Serializable { 
@Override 
public void write(Kryo kryo, Output output, VehicleTrip vehicleTrip) { 
    output.writeLong(data.getStartedOn().getTime()); 
    output.writeLong(data.getEndedOn().getTime()); 
} 

@Override 
public Data read(Kryo kryo, Input input, Class<VehicleTrip> aClass) { 
    Data data = new Data(); 
    data.setStartedOn(new Date(input.readLong())); 
    data.setEndedOn(new Date(input.readLong())); 
    return data; 
} 

我需要把數據傳回數據的bean。

按照幾篇文章我需要用一個自定義方案提供,使其拓撲結構的一部分,但直到現在我也沒有運氣

規範博爾特和方案

方案:

public class KryoScheme implements Scheme { 

    private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() { 
     protected Kryo initialValue() { 
      Kryo kryo = new Kryo(); 
      kryo.addDefaultSerializer(Data.class, new DataKyroSerializer()); 
      return kryo; 
     }; 
    }; 

    @Override 
    public List<Object> deserialize(ByteBuffer ser) { 
     return Utils.tuple(kryos.get().readObject(new ByteBufferInput(ser.array()), Data.class)); 
    } 

    @Override 
    public Fields getOutputFields() { 
     return new Fields("data"); 
    } 

} 

和螺栓:

public class AnalysisBolt implements IBasicBolt { 
/** 
* 
*/ 
private static final long serialVersionUID = 1L; 
private String topicname = null; 

public AnalysisBolt(String topicname) { 
    this.topicname = topicname; 
} 

public void prepare(Map stormConf, TopologyContext topologyContext) { 
    System.out.println("prepare"); 
} 

public void execute(Tuple input, BasicOutputCollector collector) { 
    System.out.println("execute"); 

    Fields fields = input.getFields(); 
    try { 

     JSONObject eventJson = (JSONObject) JSONSerializer.toJSON((String) input 
       .getValueByField(fields.get(1))); 
     String StartTime = (String) eventJson.get("startedOn"); 
     String EndTime = (String) eventJson.get("endedOn"); 
     String Oid = (String) eventJson.get("_id"); 
     int V_id = (Integer) eventJson.get("vehicleId"); 
     //call method getEventForVehicleWithinTime(Long vehicleId, Date startTime, Date endTime) 

     System.out.println("==========="+Oid+"| "+V_id+"| "+StartTime+"| "+EndTime); 

} catch (Exception e) { 
    e.printStackTrace(); 

} 

} 

但如果我提交風暴拓撲我越來越錯誤:

java.lang.IllegalStateException: Spout 'kafkaspout' contains a 
non-serializable field of type com.abc.topology.KryoScheme$1, which 
was instantiated prior to topology creation. 
com.minda.iconnect.topology.KryoScheme$1 should be instantiated within 
the prepare method of 'kafkaspout at the earliest. 

欣賞幫助調試問題並引導到正確的路徑。

由於

回答

1

你的ThreadLocal是不可序列化。最好的解決方案是讓你的序列化器可串行化和線程安全。如果這是不可能的,那麼我會看到2個替代方案,因爲沒有準備方法,因爲你會得到一個螺栓。

  1. 聲明它爲靜態的,這本質上是暫態的。
  2. 聲明它爲transient並通過私有get方法訪問它。然後,您可以在第一次訪問時初始化變量。
0

在風暴的生命週期,所述拓撲被實例化,然後序列化到字節格式被存儲在動物園管理員,正在執行的拓撲之前。在此步驟中,如果拓撲中的噴嘴或螺栓具有已初始化的非序列化屬性,則序列化將失敗。

如果需要一個不可序列化的字段,請將其初始化爲螺栓或噴口的準備方法,該方法在將拓撲傳遞給工作人員後運行。

來源: Best Practices for implementing Apache Storm