2014-09-29 61 views
4

我創建了一個Vagrant/Ansible操作手冊來構建單節點Kafka VM。Kafka in a box:無法發送來自主機的郵件

的想法是原型的時候提供一些靈活性:如果我們想快速&髒卡夫卡消息隊列我們可以簡單git clone [my 'kafka in a box' repo]cd ..vagrant up

這是我到目前爲止已經完成:

Vagrantfile:

VAGRANTFILE_API_VERSION = "2" 

Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| 

    config.vm.box = "hashicorp/precise64" 

    config.vm.network "forwarded_port", guest:9092, host: 9092 

    config.vm.provider "virtualbox" do |vb| 
    vb.customize ["modifyvm", :id, "--memory", "2048"] 
    end 

    config.vm.provision "ansible" do |ansible| 
    ansible.playbook = "kafkaPlaybook.yml" 
    end 

end 

...和Ansible kafkaPlaybook.yml文件:

--- 
- hosts: all 
    user: vagrant 
    sudo: True 

    tasks: 

    - name: install linux packages 
     action: apt update_cache=yes pkg={{item}} state=installed 
     with_items: 
     - vim 
     - openjdk-7-jdk 

    - name: make /usr/local/kafka directory 
     shell: "mkdir /usr/local/kafka" 

    - name: download kafka (the link is from an apache mirror) 
     get_url: url=http://apache.spinellicreations.com/kafka/0.8.1.1/kafka-0.8.1.1-src.tgz dest=/usr/local/kafka/kafka-0.8.1.1-src.tgz mode=0440 

    - name: untar file 
     shell: "tar -xvf /usr/local/kafka/kafka-0.8.1.1-src.tgz -C /usr/local/kafka" 

    - name: build kafka with gradle 
     shell: "cd /usr/local/kafka/kafka-0.8.1.1-src && ./gradlew jar" 

當我vagrant up盒子被提供。我能夠vagrant ssh並在本地執行基本的生產者/消費者測試,例如,

cd /usr/local/kafka/kafka-0.8.1.1-src 
bin/zookeeper-server-start.sh config/zookeeper.properties        #start zookeeper 
bin/kafka-server-start.sh config/server.properties          #start kafka 
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tests    #start a producer 
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning #start a consumer 

當我在生產者窗口中輸入消息時,它們出現在消費者窗口中。大。

我嘗試使用卡夫卡Python包連接到卡夫卡從主機:

>>> from kafka import KafkaClient, SimpleProducer 
>>> kafka = KafkaClient("127.0.0.1:9092", timeout=120) 
>>> kafka.ensure_topic_exists('turkey') 
No handlers could be found for logger "kafka" 
>>> kafka.ensure_topic_exists('turkey') 
>>> producer = SimpleProducer(kafka) 
>>> producer.send_messages("turkey", "gobble gobble") 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/producer.py", line 261, in send_messages 
    return super(SimpleProducer, self).send_messages(topic, partition, *msg) 
    File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/producer.py", line 188, in send_messages 
timeout=self.ack_timeout) 
    File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 312, in send_produce_request 
    resps = self._send_broker_aware_request(payloads, encoder, decoder) 
    File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 148, in _send_broker_aware_request 
    conn = self._get_conn(broker.host, broker.port) 
    File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 55, in _get_conn 
    timeout=self.timeout 
    File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 60, in __init__ 
self.reinit() 
    File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 195, in reinit 
self._raise_connection_error() 
    File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 75, in _raise_connection_error 
    raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) 
kafka.common.ConnectionError: Kafka @ precise64:9092 went away 

kafka.ensure_topic_exists有人呼籲兩次。第一次運行時,它會返回一個警告,然後創建主題,所以我可以看到Python正在與端口9092上的Kafka進行通信。但是,我無法將消息發送到隊列。

你能看到我做錯了什麼嗎?

回答

6

需要在config/server.properties中設置advertised.host.name和advertised.port。我添加了下面兩行的劇本:

- name: uncomment and set advertised.host.name 
    lineinfile: dest=/usr/local/kafka/kafka-0.8.1.1-src/config/server.properties 
       regexp='^#advertised.host.name=<hostname routable by clients>' 
       insertafter='^#advertised.host.name=<hostname routable by clients>' 
       line='advertised.host.name=localhost' 
       state=present 

- name: uncomment and set advertised.port line 
    lineinfile: dest=/usr/local/kafka/kafka-0.8.1.1-src/config/server.properties 
       regexp='^#advertised.port=<port accessible by clients>' 
       insertafter='^#advertised.port=<port accessible by clients>' 
       line='advertised.port=9092' 
       state=present 

...現在有可能提供一個單節點集羣卡夫卡:

git clone https://github.com/alexwoolford/vagrantKafkaBox 
cd vagrantKafkaBox 
vagrant up 

如果我要重新開始這個,我會可能已經使用Wirbelsturm配置了實驗室Kafka。