2017-08-01 71 views
2

我正嘗試在Azure託管的kubernetes集羣上設置多代理kafka。我有一個經紀人設置工作。對於多代理設置,目前我有一個管理kafka服務的zookeeper節點(3)的集合。我將kafka羣集部署爲複製因子爲3的複製控制器。即3個經紀人。我如何使用Zookeeper註冊三個經紀人,使他們在Zookeeper中註冊不同的IP地址?如何爲kubernetes上的Kafka multi broker設置指定公佈的偵聽器並在集羣外公開集羣?

在部署服務後啓動我的複製控制器,並在我的replication-controller yaml文件中使用羣集IP來指定兩個advertised.listeners,一個用於SSL,另一個用於PLAINTEXT。但是,在這種情況下,所有代理都使用相同的IP註冊並寫入副本失敗。我不想將每個代理作爲單獨的複製控制器/ pod和服務部署,因爲擴展成爲問題。我真的很感激任何想法/想法。

編輯1:

我還試圖集羣暴露到另一個VPC雲。我必須爲我使用advertised.listeners的客戶端公開SSL和PLAINTEXT端口。如果我使用複製因子爲3的statefulset,並讓kubernetes將主機名的標準主機名顯示爲主機名,則無法從外部客戶端解析這些主機名。我得到這個工作的唯一方法是使用/公開與每個代理對應的外部服務。但是,這並沒有規模。

+0

你是否曾經能夠解決這個問題? –

+0

嗨邁克,我還沒有解決卡夫卡的「易擴展性」問題。我正在使用statefulset(對於一個perisistent卷)中的每個代理,並將廣告主機作爲Ingress-IP-address(到我的kubernetes集羣):port,爲不同的代理提供不同的端口。這很難擴展,因爲需要爲更多經紀商開放新的端口,並且需要相應地更改broker.yaml中的廣告端口。 – Annu

回答

1

Kubernetes的概念是Statefulsets來解決這些問題。 statefulset的每個實例都有自己的DNS名稱,因此您可以通過dns名稱引用每個實例。

該概念更詳細地描述爲here。你也可以看看這個complete example

apiVersion: v1 
kind: Service 
metadata: 
    name: zk-headless 
    labels: 
    app: zk-headless 
spec: 
    ports: 
    - port: 2888 
    name: server 
    - port: 3888 
    name: leader-election 
    clusterIP: None 
    selector: 
    app: zk 
--- 
apiVersion: v1 
kind: ConfigMap 
metadata: 
    name: zk-config 
data: 
    ensemble: "zk-0;zk-1;zk-2" 
    jvm.heap: "2G" 
    tick: "2000" 
    init: "10" 
    sync: "5" 
    client.cnxns: "60" 
    snap.retain: "3" 
    purge.interval: "1" 
--- 
apiVersion: policy/v1beta1 
kind: PodDisruptionBudget 
metadata: 
    name: zk-budget 
spec: 
    selector: 
    matchLabels: 
     app: zk 
    minAvailable: 2 
--- 
apiVersion: apps/v1beta1 
kind: StatefulSet 
metadata: 
    name: zk 
spec: 
    serviceName: zk-headless 
    replicas: 3 
    template: 
    metadata: 
     labels: 
     app: zk 
     annotations: 
     pod.alpha.kubernetes.io/initialized: "true" 

    spec: 
     affinity: 
     podAntiAffinity: 
      requiredDuringSchedulingIgnoredDuringExecution: 
      - labelSelector: 
       matchExpressions: 
        - key: "app" 
        operator: In 
        values: 
        - zk-headless 
       topologyKey: "kubernetes.io/hostname" 
     containers: 
     - name: k8szk 
     imagePullPolicy: Always 
     image: gcr.io/google_samples/k8szk:v1 
     resources: 
      requests: 
      memory: "4Gi" 
      cpu: "1" 
     ports: 
     - containerPort: 2181 
      name: client 
     - containerPort: 2888 
      name: server 
     - containerPort: 3888 
      name: leader-election 
     env: 
     - name : ZK_ENSEMBLE 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: ensemble 
     - name : ZK_HEAP_SIZE 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: jvm.heap 
     - name : ZK_TICK_TIME 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: tick 
     - name : ZK_INIT_LIMIT 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: init 
     - name : ZK_SYNC_LIMIT 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: tick 
     - name : ZK_MAX_CLIENT_CNXNS 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: client.cnxns 
     - name: ZK_SNAP_RETAIN_COUNT 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: snap.retain 
     - name: ZK_PURGE_INTERVAL 
      valueFrom: 
      configMapKeyRef: 
       name: zk-config 
       key: purge.interval 
     - name: ZK_CLIENT_PORT 
      value: "2181" 
     - name: ZK_SERVER_PORT 
      value: "2888" 
     - name: ZK_ELECTION_PORT 
      value: "3888" 
     command: 
     - sh 
     - -c 
     - zkGenConfig.sh && zkServer.sh start-foreground 
     readinessProbe: 
      exec: 
      command: 
      - "zkOk.sh" 
      initialDelaySeconds: 15 
      timeoutSeconds: 5 
     livenessProbe: 
      exec: 
      command: 
      - "zkOk.sh" 
      initialDelaySeconds: 15 
      timeoutSeconds: 5 
     volumeMounts: 
     - name: datadir 
      mountPath: /var/lib/zookeeper 
     securityContext: 
     runAsUser: 1000 
     fsGroup: 1000 
    volumeClaimTemplates: 
    - metadata: 
     name: datadir 
    spec: 
     accessModes: [ "ReadWriteOnce" ] 
     resources: 
     requests: 
      storage: 20Gi 
+0

嗨盧卡斯,感謝您的迴應。然而,我關心的是,所有經紀人(作爲複製控制器的一部分)向zookeeper註冊了相同的advertised.listener主機和端口。如果他們是有組織的一部分,他們會註冊不同嗎?即使他們仍然在宣傳相同的主機和端口?我會嘗試這種方法,讓知道! – Annu

+0

Hi @Annu,這種方法是在Kubernetes上運行多動物園管理員和多經紀人Kafka集羣的經過驗證的方法。你可以看一下https://github.com/kubernetes/charts/tree/master/incubator/kafka,它顯示了一個很好的默認Kafka部署Kubernetes與頭盔。 –

+0

嗨@Luckas,我仍然在努力讓它與StatefulSet一起工作。我正在使用advertised.listeners來公開SSL和PLAINTEXT端口。我的問題是我需要能夠通過雲中不同的VPC訪問集羣。我對卡夫卡的理解是,生產者/消費者需要能夠直接連接(解析)經紀人以讀取/寫入數據。我不想使用三種不同的服務來揭露我的經紀人。但我沒有看到任何其他方式去實現它。 – Annu