2017-09-28 790 views
-1

我想將Kafka消息寫入MySQL數據庫。在this鏈接中有一個示例。在那個例子中,apache flume用於消費消息並將其寫入MySQL。我使用相同的代碼,當我運行flume-ng agentevent始終成爲null將Kafka消息流式傳輸到MySQL數據庫

而且我flume.conf.properties文件是:

agent.sources=kafkaSrc 
agent.channels=channel1 
agent.sinks=jdbcSink 

agent.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel 
agent.channels.channel1.brokerList=localhost:9092 
agent.channels.channel1.topic=kafkachannel 
agent.channels.channel1.zookeeperConnect=localhost:2181 
agent.channels.channel1.capacity=10000 
agent.channels.channel1.transactionCapacity=1000 
agent.channels.channel1.parseAsFlumeEvent=false 


agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource 
agent.sources.kafkaSrc.channels = channel1 
agent.sources.kafkaSrc.zookeeperConnect = localhost:2181 
agent.sources.kafkaSrc.topic = kafka-mysql 

agent.sinks.jdbcSink.type = com.stratio.ingestion.sink.jdbc.JDBCSink 
agent.sinks.jdbcSink.connectionString = jdbc:mysql://127.0.0.1:3306/test?useSSL=false 
agent.sinks.jdbcSink.username=root 
agent.sinks.jdbcSink.password=pass 
agent.sinks.jdbcSink.batchSize = 10 
agent.sinks.jdbcSink.channel =channel1 
agent.sinks.jdbcSink.sqlDialect=MYSQL 
agent.sinks.jdbcSink.driver=com.mysql.jdbc.Driver 
agent.sinks.jdbcSink.sql=INSERT INTO kafkamsg(msg) VALUES(${body:varchar}) 

我哪裏錯了?

謝謝。

回答

0

在我的退役示例中,水槽會聽到kafka爲kafka-mysql的話題。但是,此代碼適用於kafkachannel主題。所以我們需要產生消息到kafkachannel的話題,我不知道爲什麼。