2016-11-22 107 views
0

我試圖連接到IBM Bluemix消息轂和用java產生的消息之後的示例卡夫卡生產者超時問題連接到消息轂上Bluemix

https://github.com/ibm-messaging/message-hub-samples/tree/master/java/message-hub-kafka-ssl

producer.properties

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
acks=-1 
security.protocol=SASL_SSL 
sasl.mechanism=PLAIN 
ssl.protocol=TLSv1.2 
ssl.enabled.protocols=TLSv1.2 
ssl.truststore.password=changeit 
ssl.truststore.type=JKS 
ssl.endpoint.identification.algorithm=HTTPS 
ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/security/cacerts 

jaas.conf.template

KafkaClient { 
    com.ibm.messagehub.login.MessageHubLoginModule required 
    serviceName="kafka" 
    user="$USERNAME" 
    password="$PASSWORD"; 
}; 

Producer.java片斷

ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(
       "MyTopic", 
       KEY.getBytes("UTF-8"), 
       "MESSAGE".getBytes("UTF-8")); 

     // Synchronously wait for a response from Message Hub/Kafka. 
     RecordMetadata m = kafkaProducer.send(record).get(); 

的問題是,我得到當我試圖讓未來RecordMetadata

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:730) 
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483) 
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) 
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) 

閱讀先前的帖子在同一個超時異常話題

Timeout connecting to message-hub on Bluemix

,並提到了可能的原因是該主題不是created.I可以看到bluemix控制檯的話題和驗證,我稱之爲REST服務推送消息

RESTRequest restApi = new RESTRequest(getRestHost(),getApiKey()); 

String topics = restApi.get("/admin/topics", false); 

logger.info("Topics present in the system: " + topics); 

它返回之前獲取的主題列表我嘗試推送消息的主題,但我收到超時錯誤。

能有人幫我調試的問題

UPDATE

根據意見,我啓用了調試日誌,卡夫卡,這裏是日誌序列

2016-11-23 08:48:20.906 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient : Initialize connection to node -5 for sending metadata request 
2016-11-23 08:48:20.906 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient : Initiating connection to node -5 at kafka05-prod01.messagehub.services.us-south.bluemix.net:9093. 
2016-11-23 08:48:20.914 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator  : Set SASL client state to SEND_HANDSHAKE_REQUEST 
2016-11-23 08:48:20.915 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator  : Creating SaslClient: client=messagehub/[email protected];service=kafka;serviceHostname=kafka05-prod01.messagehub.services.us-south.bluemix.net;mechs=[PLAIN] 
2016-11-23 08:48:20.979 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--5.bytes-sent 
2016-11-23 08:48:20.980 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--5.bytes-received 
2016-11-23 08:48:20.980 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--5.latency 
2016-11-23 08:48:20.982 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient : Completed connection to node -5 
2016-11-23 08:48:21.080 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator  : Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE 

2016-11-23 08:48:21.264 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator  : Set SASL client state to INITIAL 
2016-11-23 08:48:21.265 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator  : Set SASL client state to INTERMEDIATE 
2016-11-23 08:48:21.284 DEBUG 72885 --- [sage-hub-sample] o.apache.kafka.common.network.Selector : Connection with kafka05-prod01.messagehub.services.us-south.bluemix.net/23.246.202.55 disconnected 

java.io.EOFException: null 
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:488) 
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) 
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:239) 
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:182) 
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64) 
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:318) 
at org.apache.kafka.common.network.Selector.poll(Selector.java:283) 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) 
at java.lang.Thread.run(Thread.java:745) 

所有以上記錄爲5個經紀人中的任何一個。這些都是調試語句,所以我不確定它們是否是錯誤。

-Tatha

回答

1

我可以看到一對夫婦的問題與你的JAAS文件:

  • 你指的是使用卡夫卡0.10.0.X所以你不應該使用的舊消息中心登錄模塊的樣品卡夫卡0.9。 因此,通過「org.apache.kafka.common.security.plain.PlainLoginModule」

  • 用戶名的領域被稱爲「用戶名」,而不是「用戶更換「com.ibm.messagehub.login.MessageHubLoginModule」 」。 它應該是用戶名=「$ USERNAME」

+0

謝謝@Mickael,那些修改問題的更改 – Tatha

0

請啓用log4j的DEBUG水平和卡夫卡客戶跟蹤可幫助識別任何連接問題。 嘗試將最後一行更改爲 log4j.logger.org.apache.kafka=DEBUGlog4j.properties文件中,重建並查看詳細輸出。隨時重新發布。

+0

感謝您關注此事。我用apache.kafka調試日誌更新了Q.看起來像你提到的網絡/連接問題,但我不知道爲什麼會發生這種情況 – Tatha