1

我使用Kafka-Cassandra-Sink將數據從Kafka推送到Cassandra。 Kafka以JSON格式從logstash獲取所有時間數據,並且我想通過連接器將相同的數據推送到Cassandra。Kafka連接Cassandra連接器

我是由本教程領導的:http://docs.datamountaineer.com/en/latest/cassandra-sink.html,我在模式註冊表中從avro更改爲JSON格式。之後我做了新的實例我有這種哪些,我可以看到一個錯誤是常見的:

[2017-02-10 09:01:49,978] ERROR Task cassandra-sink-orders-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:390) 
java.lang.RuntimeException: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct 
     at com.datamountaineer.streamreactor.connect.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:58) 
     at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleError(ErrorHandler.scala:67) 
     at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleTry(ErrorHandler.scala:48) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.handleTry(CassandraJsonWriter.scala:40) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.insert(CassandraJsonWriter.scala:144) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.write(CassandraJsonWriter.scala:104) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask$$anonfun$put$2.apply(CassandraSinkTask.scala:72) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask$$anonfun$put$2.apply(CassandraSinkTask.scala:72) 
     at scala.Option.foreach(Option.scala:257) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask.put(CassandraSinkTask.scala:72) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct 
     at com.datamountaineer.streamreactor.connect.schemas.ConverterUtil$class.convert(ConverterUtil.scala:59) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.convert(CassandraJsonWriter.scala:40) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.com$datamountaineer$streamreactor$connect$cassandra$sink$CassandraJsonWriter$$toJson(CassandraJsonWriter.scala:154) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply$mcV$sp(CassandraJsonWriter.scala:127) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125) 
     at com.datamountaineer.streamreactor.connect.concurrent.ExecutorExtension$RunnableWrapper$$anon$1.run(ExecutorExtension.scala:30) 
     ... 3 more 
[2017-02-10 09:01:49,981] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:391) 

我試過其他的方式,通過logstash發送Avro的卡夫卡但我不知道該如何處理與像@timestamp版本屬性或。換句話說,我不知道如何創建Avro的模式,這將成功地解析這兩個字段,因爲我讓周圍的架構屬性unrecognizing @錯誤。

有人可以給我建議或另一個接收器。它不一定要融合,我只是需要接收器才能從卡夫卡獲取這些數據給卡桑德拉。謝謝研究員!

+0

卡夫卡主題的來源是什麼?來自Connect還是來自其他地方?是Avro序列化還是JSON?它在Kafka中?你可以張貼你的水槽連接器配置嗎。 –

回答

0

,你必須改變路徑等一些性能/模式的註冊表/ connect-avro-distributed.properties 你有評論這些線

key.converter = io.confluent.connect.avro。 AvroConverter key.converter.schema.registry.url = http://localhost:8081 value.converter = io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url = http://localhost:8081

並添加這些線S:

key.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable =假 value.converter = org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable = false