2017-06-18 154 views
0

我對卡夫卡非常陌生,並開始致力於生產者 - 消費者模型。
我爲Kafka製作人編寫了一個程序,它將從MySQL讀取表格,並且我正在命令提示符下閱讀這些信息。
問題是我只能看到表格最後一列的數據。我的卡夫卡生產者類中的錯誤

MySQL表結構及數據:

+----------+---------+----------------+ 
| dept  | empName | salary(double) | 
+----------+---------+----------------+ 
| Engg  | Fred | 2000   | 
| Engg  | Bob  | 3000   | 
| Engg  | Joe  | 1000   | 
| Arts  | Jack | 5000   | 
| Commerce | Jill | 2400   | 
| Arts  | James | 3000   | 
| Commerce | Rob  | 8700   | 
+----------+---------+----------------+ 

卡夫卡製作類:

[[email protected] kafka_2.11-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbrec --from-beginning 
[2017-06-18 10:23:22,428] WARN Error while fetching metadata with correlation id 2 : {dbrec=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 
[2017-06-18 10:23:22,628] WARN The following subscribed topics are not assigned to any members in the group console-consumer-33197 : [dbrec] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 

2000.0 
3000.0 
1000.0 
5000.0 
2400.0 
3000.0 
8700.0 

我剛開始從最後一列中的數據:

package com.kafka.producer; 

import java.sql.Connection; 
import java.sql.DriverManager; 
import java.sql.PreparedStatement; 
import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.util.Properties; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

@SuppressWarnings("deprecation") 
public class KafkaMySqlProducer { 
    public static void main(String[] args) { 

     Connection con; 
     PreparedStatement pstmnt; 
     ResultSet rs; 

     Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("serializer.class","kafka.serializer.StringEncoder"); 
     props.put("metadata.broker.list","localhost:9092"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer producer = new Producer(config); 

     try { 
      Class.forName("com.mysql.jdbc.Driver"); 
      con = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb","root","cloudera"); 
      pstmnt = con.prepareStatement("select * from department"); 
      rs = pstmnt.executeQuery(); 
      while(rs.next()) { 
       String name = rs.getString(1); 
       String department = rs.getString(2); 
       String salary = " " + rs.getDouble(3); 
       producer.send(new KeyedMessage("dbrec", name, department, salary)); 
      } 
      con.close(); 
     } catch (ClassNotFoundException e) { 
      e.printStackTrace(); 
     } catch (SQLException e) { 
      e.printStackTrace(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

    } 
} 

從我的消費輸出的表格。
任何人都可以告訴我我在這裏做什麼錯誤?

+0

消耗message如果你開始與卡夫卡一個新的項目,最好開始使用新的生產國和消費類。我看到你正在使用生產者連接到Zookeeper的舊版本獲取關於經紀人的信息,而不是直接向其中的一個。 – ppatierno

回答

2

具有4個參數的KeyedMessage構造函數的最後一個參數是實際的消息。

該類不僅接受所有值的列表。

具體而言,您提供的參數是​​。你只能從topic

+1

好吧..糾正它這樣。 (新的KeyedMessage(「dbrec」,name +「」+ department +「」+ salary));現在正在工作。 – Sidhartha

+0

另外我看到這兩行不推薦使用的消息。 \t \t ProducerConfig config = new ProducerConfig(props); \t \t Producer producer = new Producer(config); 您能告訴我這些方法有哪些新選項,以便我可以實施它們。 – Sidhartha

+0

不確定,但JavaDoc可能會爲你解答 –

相關問題