我寫的自定義分區程序,無論是類同一個項目,但不知道爲什麼我收到以下error.Please幫我this.Below程序卡夫卡生產program.When我運行程序我得到誤差SensorPartitioner類未找到。卡夫卡customet partioner錯誤
錯誤:
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value SensorPartitioner for configuration partitioner.class: Class SensorPartitioner could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:671)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:418)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:63)
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:338)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)
at kafka.test.main(test.java:19)
package kafka;
import org.apache.kafka.clients.producer.*;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
public class test {
public static void main(String[] args) throws Exception{
System.out.println("new");
String topicName = "partitionTopic";
String sCurrentLine;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "SensorPartitioner");
props.put("speed.sensor.name", "core");
Producer<String, String> producer = new KafkaProducer <String, String>(props);
try{
BufferedReader br = null;
br = new BufferedReader(new FileReader("datagen_10.txt"));
//String arr1=" ";
while ((sCurrentLine = br.readLine()) != null) {
System.out.println(sCurrentLine);
String[] arr11 = sCurrentLine.split(",");
String key=arr11[0];
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,key,sCurrentLine);
RecordMetadata metadata = producer.send(record).get();
System.out.println("Message is sent to Partition no " + metadata.partition() + " and offset " + metadata.offset());
System.out.println("SynchronousProducer Completed with success.");
}
br.close();
}catch (Exception e) {
e.printStackTrace();
System.out.println("SynchronousProducer failed with an exception");
}finally{
producer.close();
}
}
}
below program is custom partitioner program.
package kafka;
import java.util.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.utils.*;
import org.apache.kafka.common.record.*;
public class SensorPartitioner implements Partitioner {
private String speedSensorName;
public void configure(Map<String, ?> configs) {
speedSensorName = configs.get("speed.sensor.name").toString();
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int sp = (int)Math.abs(numPartitions*0.3);
int p=0;
if ((keyBytes == null) || (!(key instanceof String)))
throw new InvalidRecordException("All messages must have sensor name as key");
if (((String)key).equals(speedSensorName))
p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp;
else
p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions-sp) + sp ;
System.out.println("Key = " + (String)key + " Partition = " + p);
return p;
}
public void close() {}
}
嘗試'props.put( 「partitioner.class」, 「kafka.SensorPartitioner」);' – amethystic