2017-08-10 148 views
0

我在CentOS 7(融合)安裝了Apache卡夫卡,我試圖運行FILESTREAM卡夫卡在分佈式模式連接,但我得到以下錯誤:卡夫卡連接問題

[2017-08-10 05:26:27,355] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290) 
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "internal.key.converter" which has no default value. 
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:463) 
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) 
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62) 
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75) 
at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:197) 
at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:289) 
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:65) 

現在通過更新工作人員解決的.properties如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-config

Command used:

/home/arun/kafka/confluent-3.3.0/bin/connect-distributed.sh ../../../properties/file-stream-demo-distributed.properties 

Filestream properties file (workers.properties):

01中提到
name=file-stream-demo-distributed 
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector 
tasks.max=1 
file=/tmp/demo-file.txt 
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094 
config.storage.topic=demo-2-distributed 
offset.storage.topic=demo-2-distributed 
status.storage.topic=demo-2-distributed 
key.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=true 
value.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter.schemas.enable=true 
internal.key.converter=org.apache.kafka.connect.json.JsonConverter 
internal.key.converter.schemas.enable=false 
internal.value.converter=org.apache.kafka.connect.json.JsonConverter 
internal.value.converter.schemas.enable=false 
group.id="" 

我加了下面的屬性和命令經歷沒有任何錯誤。

bootstrap.servers=localhost:9092,localhost:9093,localhost:9094 
config.storage.topic=demo-2-distributed 
offset.storage.topic=demo-2-distributed 
status.storage.topic=demo-2-distributed 
group.id="" 

但是,現在當我運行consumer命令時,我無法在/tmp/demo-file.txt中看到消息。請讓我知道是否有方法可以檢查郵件是否發佈到kafka主題和分區?

kafka-console-consumer --zookeeper localhost:2181 --topic demo-2-distributed --from-beginning 

我相信我錯過了這裏真正基本的東西。有人可以幫忙嗎?

+0

什麼是獨立的工人屬性?必須定義「internal.key.converter」配置屬性(以及其他)。 –

+0

謝謝你的迴應。我用更新的屬性更新了這個問題。你能建議嗎?另外,我可以請知道之間config.storage.topic,offset.storage.topic和status.storage.topic區別?似乎所有人都提到卡夫卡發佈的主題。如果錯誤,請糾正我。 –

+0

您的Kafka集羣中有多少家經紀商正在運行?如果小於3,則必須指定'... replication.factor'屬性;看到http://docs.confluent.io/current/connect/userguide.html#distributed-worker-configuration –

回答

1

您需要定義獨特的主題卡夫卡連接架構來存儲它的配置,偏移和狀態。

在你workers.properties文件來改變這些參數來像下面這樣:

config.storage.topic=demo-2-distributed-config 
offset.storage.topic=demo-2-distributed-offset 
status.storage.topic=demo-2-distributed-status 

這些主題使用存儲狀態和連接配置元數據和用於存儲信息任何的連接器上運行的連接器。不要在這三個主題中的任何一個上使用控制檯消費者,並期望看到這些消息。

的消息被存儲在與被稱爲「主題」的參數的連接器配置JSON配置的話題。

示例文件灌入config.json文件

{ 
    "name": "MyFileSink", 
    "config": { 
     "topics": "mytopic", 
     "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", 
     "tasks.max": 1, 
     "key.converter": "org.apache.kafka.connect.storage.StringConverter", 
     "value.converter": "org.apache.kafka.connect.storage.StringConverter", 
     "file": "/tmp/demo-file.txt" 
    } 
} 

一旦分佈式工人正在運行,你需要在配置文件適用於它使用curl,像這樣:

curl -X POST -H "Content-Type: application/json" --data @file-sink-config.json http://localhost:8083/connectors 

的後config將安全地存儲在您爲所有分佈式工作人員使用的配置主題中。確保配置主題(以及狀態和偏移主題)不會過期消息,否則當連接器配置發生變化時,將導致失去連接器配置。

+0

謝謝你的建議!那麼我需要放置我的config.json文件?應該將json文件傳遞給連接分佈式腳本?我想在分佈式模式下,應該只有一個工作人員屬性文件 –

+0

我可以在運行curl命令後運行,如http://docs.confluent.io/2.0.0/connect/userguide.html中所述 –