2017-03-05 137 views
0

我寫的自定義分區程序,無論是類同一個項目,但不知道爲什麼我收到以下error.Please幫我this.Below程序卡夫卡生產program.When我運行程序我得到誤差SensorPartitioner類未找到。卡夫卡customet partioner錯誤

錯誤:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value SensorPartitioner for configuration partitioner.class: Class SensorPartitioner could not be found. 
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:671) 
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:418) 
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56) 
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:63) 
    at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:338) 
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188) 
    at kafka.test.main(test.java:19) 



package kafka; 

import org.apache.kafka.clients.producer.*; 
import java.io.BufferedReader; 
import java.io.FileReader; 
import java.util.Properties; 
public class test { 
public static void main(String[] args) throws Exception{ 
    System.out.println("new"); 
    String topicName = "partitionTopic"; 
    String sCurrentLine; 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092,localhost:9093"); 
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("partitioner.class", "SensorPartitioner"); 
    props.put("speed.sensor.name", "core"); 
    Producer<String, String> producer = new KafkaProducer <String, String>(props); 

      try{ 

    BufferedReader br = null; 
    br = new BufferedReader(new FileReader("datagen_10.txt")); 
      //String arr1=" "; 
      while ((sCurrentLine = br.readLine()) != null) { 

       System.out.println(sCurrentLine); 
       String[] arr11 = sCurrentLine.split(","); 
       String key=arr11[0]; 
      ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,key,sCurrentLine); 
      RecordMetadata metadata = producer.send(record).get(); 
      System.out.println("Message is sent to Partition no " + metadata.partition() + " and offset " + metadata.offset()); 
      System.out.println("SynchronousProducer Completed with success."); 
     } 
      br.close(); 
     }catch (Exception e) { 
      e.printStackTrace(); 
      System.out.println("SynchronousProducer failed with an exception"); 
     }finally{ 
      producer.close(); 
     } 
    } 
} 

below program is custom partitioner program. 


package kafka; 
import java.util.*; 
import org.apache.kafka.clients.producer.*; 
import org.apache.kafka.common.*; 
import org.apache.kafka.common.utils.*; 
import org.apache.kafka.common.record.*; 

public class SensorPartitioner implements Partitioner { 

    private String speedSensorName; 

    public void configure(Map<String, ?> configs) { 
      speedSensorName = configs.get("speed.sensor.name").toString(); 

    } 

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 

      List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 
      int numPartitions = partitions.size(); 
      int sp = (int)Math.abs(numPartitions*0.3); 
      int p=0; 

      if ((keyBytes == null) || (!(key instanceof String))) 
       throw new InvalidRecordException("All messages must have sensor name as key"); 

      if (((String)key).equals(speedSensorName)) 
       p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp; 
      else 
       p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions-sp) + sp ; 

       System.out.println("Key = " + (String)key + " Partition = " + p); 
       return p; 
    } 
     public void close() {} 

} 
+0

嘗試'props.put( 「partitioner.class」, 「kafka.SensorPartitioner」);' – amethystic

回答

-1

使用絕對路徑SensorPartitioner類,如下:

props.put("partitioner.class", "com.subpackage.subsubpackage.SensorPartitioner");