我在CentOS 7(融合)安裝了Apache卡夫卡,我試圖運行FILESTREAM卡夫卡在分佈式模式連接,但我得到以下錯誤:卡夫卡連接問題
[2017-08-10 05:26:27,355] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "internal.key.converter" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:463)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:197)
at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:289)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:65)
現在通過更新工作人員解決的.properties如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-config
Command used:
/home/arun/kafka/confluent-3.3.0/bin/connect-distributed.sh ../../../properties/file-stream-demo-distributed.properties
Filestream properties file (workers.properties):
name=file-stream-demo-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/demo-file.txt
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
config.storage.topic=demo-2-distributed
offset.storage.topic=demo-2-distributed
status.storage.topic=demo-2-distributed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
group.id=""
我加了下面的屬性和命令經歷沒有任何錯誤。
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
config.storage.topic=demo-2-distributed
offset.storage.topic=demo-2-distributed
status.storage.topic=demo-2-distributed
group.id=""
但是,現在當我運行consumer命令時,我無法在/tmp/demo-file.txt中看到消息。請讓我知道是否有方法可以檢查郵件是否發佈到kafka主題和分區?
kafka-console-consumer --zookeeper localhost:2181 --topic demo-2-distributed --from-beginning
我相信我錯過了這裏真正基本的東西。有人可以幫忙嗎?
什麼是獨立的工人屬性?必須定義「internal.key.converter」配置屬性(以及其他)。 –
謝謝你的迴應。我用更新的屬性更新了這個問題。你能建議嗎?另外,我可以請知道之間config.storage.topic,offset.storage.topic和status.storage.topic區別?似乎所有人都提到卡夫卡發佈的主題。如果錯誤,請糾正我。 –
您的Kafka集羣中有多少家經紀商正在運行?如果小於3,則必須指定'... replication.factor'屬性;看到http://docs.confluent.io/current/connect/userguide.html#distributed-worker-configuration –