2016-06-11 101 views
3

我在kubernetes上的Docker中運行1 kafka和3 zookeeper-server,在此instruction之後。我無法在pod(docker容器)之外生成/消耗主題。kubernetes上的kafka無法生成/消耗主題(ClosedChannelException,ErrorLoggingCallback)

bin/kafka-console-producer.sh --broker-list 1.2.3.4:9092 --topic 

[2016-06-11 15:14:46,889] ERROR Error when sending message to topic test with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
org.apache.kafka.common.errors.TimeoutException: Batch containing 3 record(s) expired due to timeout while requesting metadata from brokers for test-0 

bin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning 

[2016-06-11 15:15:58,985] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(1001,kafka-service,9092)] failed (kafka.client.ClientUtils$) 
java.nio.channels.ClosedChannelException 
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) 
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) 
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:124) 
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) 
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) 
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) 
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) 
[2016-06-11 15:15:58,992] WARN [console-consumer-66869_tattoo-NV49C-1465629357799-ce1529da-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) 
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(1001,kafka-service,9092))] failed 
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) 
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) 
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) 
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) 
Caused by: java.nio.channels.ClosedChannelException 
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) 
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) 
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:124) 
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) 
    ... 3 more 


kafka log: 
    [2016-06-11 07:47:58,269] INFO [Kafka Server 1001], started (kafka.server.KafkaServer) 
[2016-06-11 07:53:50,404] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions [test,0] (kafka. server.ReplicaFetcherManager) 
[2016-06-11 07:53:50,443] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log) 
[2016-06-11 07:53:50,458] INFO Created log for partition [test,0] in /kafka/kafka-logs-kafka-controller-3rsv3 with  properties {compression.type -> producer, message.format.version -> 0.10.0-IV1, file.delete.delay.ms -> 60000, max. message.bytes -> 1000012, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0,  preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable  -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms ->  9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message. timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->  9223372036854775807}. (kafka.log.LogManager) 
[2016-06-11 07:53:50,459] INFO Partition [test,0] on broker 1001: No checkpointed highwatermark is found for  partition [test,0] (kafka.cluster.Partition) 
[2016-06-11 07:57:57,955] INFO [Group Metadata Manager on Broker 1001]: Removed 0 expired offsets in 0 milliseconds. ( kafka.coordinator.GroupMetadataManager) 

而且config/server.properties

broker.id=-1 
log.dirs=/kafka/kafka-logs-kafka-controller-3rsv3 
num.partitions=1 
zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 
zookeeper.connection.timeout.ms=6000 

service.port.9092.tcp.addr=10.254.68.65 
service.port.9092.tcp.proto=tcp 
service.service.port.kafka.port=9092 
service.service.port=9092 
service.port=tcp://10.254.68.65:9092 
service.port.9092.tcp.port=9092 
version=0.10.0.0 
service.service.host=10.254.68.65 
port=9092 
advertised.host.name=kafka-service 
service.port.9092.tcp=tcp://10.254.68.65:9092 
advertised.port=9092 

但我可以做bin/kafka-console-producer.sh --broker-list localhost:9092 --topicbin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning如果我莢(泊塢窗容器)內。

並連接到動物園管理員的服務時,我可以正常地創建和列表主題:

bin/kafka-topics.sh --describe --zookeeper 5.6.7.8:2181 --topic test 
Topic:test PartitionCount:1 ReplicationFactor:1 Configs: 
    Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001 

和創建卡夫卡replicaiton-congtroller和服務我YAML文件:

--- 
apiVersion: v1 
kind: Service 
metadata: 
    name: kafka-service2 
    labels: 
    app: kafka2 
spec: 
    clusterIP: None 
    ports: 
    - port: 9092 
    name: kafka-port 
    targetPort: 9092 
    protocol: TCP 
    selector: 
    app: kafka2 
--- 
apiVersion: v1 
kind: ReplicationController 
metadata: 
    name: kafka-controller2 
spec: 
    replicas: 1 
    selector: 
    app: kafka2 
    template: 
    metadata: 
     labels: 
     app: kafka2 
    spec: 
     containers: 
     - name: kafka2 
     image: wurstmeister/kafka 
     ports: 
     - containerPort: 9092 
     env: 
     - name: KAFKA_ADVERTISED_PORT 
      value: "9092" 
     - name: KAFKA_ADVERTISED_HOST_NAME 
      value: kafka-service2 
     - name: KAFKA_ZOOKEEPER_CONNECT 
      value: zoo1:2181,zoo2:2181,zoo3:2181 

回答

4

卡夫卡註冊到動物園管理員其服務的名稱。消費/生成消息需要訪問服務名稱(這裏是zookeeper-1,zookeeper-2,zookeeper-3上的dns記錄),這些記錄只能通過kubernetes的dns訪問。所以只有在kubernetes上運行的應用程序才能訪問我的kafka。因此,我不能使用kafka服務的外部IP或端口轉發kafka pod到本地主機,然後訪問它。

但是爲什麼我可以在kubernetes集羣之外創建,列出和描述主題?我想這是因爲動物園管理員可以自己做這些操作。但是,消費/製作消息需要訪問由kafka提供的ADVERTISED_HOST_NAME。

+0

您是否找到解決方案來發布/使用來自kubernetes集羣外部的消息? – lucy

+0

我還沒試過。但也許你應該使用入口,https://kubernetes.io/docs/concepts/services-networking/ingress/,它允許來自外部羣集的連接 – Pao