我创建了一个Vagrant/Ansible手册来构建单节点Kafka VM.
我们的想法是在原型设计时提供一些灵活性:如果我们想要一个快速而肮脏的Kafka消息队列,我们可以简单地git clone [my 'kafka in a box' repo]
,cd ..
和vagrant up
.
这是我到目前为止所做的:
Vagrantfile:
VAGRANTFILE_API_VERSION = "2" Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| config.vm.box = "hashicorp/precise64" config.vm.network "forwarded_port", guest:9092, host: 9092 config.vm.provider "virtualbox" do |vb| vb.customize ["modifyvm", :id, "--memory", "2048"] end config.vm.provision "ansible" do |ansible| ansible.playbook = "kafkaPlaybook.yml" end end
...和Ansible kafkaPlaybook.yml
文件:
--- - hosts: all user: vagrant sudo: True tasks: - name: install linux packages action: apt update_cache=yes pkg={{item}} state=installed with_items: - vim - openjdk-7-jdk - name: make /usr/local/kafka directory shell: "mkdir /usr/local/kafka" - name: download kafka (the link is from an apache mirror) get_url: url=http://apache.spinellicreations.com/kafka/0.8.1.1/kafka-0.8.1.1-src.tgz dest=/usr/local/kafka/kafka-0.8.1.1-src.tgz mode=0440 - name: untar file shell: "tar -xvf /usr/local/kafka/kafka-0.8.1.1-src.tgz -C /usr/local/kafka" - name: build kafka with gradle shell: "cd /usr/local/kafka/kafka-0.8.1.1-src && ./gradlew jar"
当我vagrant up
配置盒子时.我能够在vagrant ssh
本地执行基本的生产者/消费者测试,例如
cd /usr/local/kafka/kafka-0.8.1.1-src bin/zookeeper-server-start.sh config/zookeeper.properties #start zookeeper bin/kafka-server-start.sh config/server.properties #start kafka bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tests #start a producer bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning #start a consumer
当我在生产者窗口中键入消息时,它们出现在消费者窗口中.大.
我尝试使用kafka-python包从主机连接到kafka:
>>> from kafka import KafkaClient, SimpleProducer >>> kafka = KafkaClient("127.0.0.1:9092", timeout=120) >>> kafka.ensure_topic_exists('turkey') No handlers could be found for logger "kafka" >>> kafka.ensure_topic_exists('turkey') >>> producer = SimpleProducer(kafka) >>> producer.send_messages("turkey", "gobble gobble") Traceback (most recent call last): File "", line 1, in File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/producer.py", line 261, in send_messages return super(SimpleProducer, self).send_messages(topic, partition, *msg) File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/producer.py", line 188, in send_messages timeout=self.ack_timeout) File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 312, in send_produce_request resps = self._send_broker_aware_request(payloads, encoder, decoder) File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 148, in _send_broker_aware_request conn = self._get_conn(broker.host, broker.port) File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/client.py", line 55, in _get_conn timeout=self.timeout File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 60, in __init__ self.reinit() File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 195, in reinit self._raise_connection_error() File "/Users/awoolford/anaconda/lib/python2.7/site-packages/kafka/conn.py", line 75, in _raise_connection_error raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) kafka.common.ConnectionError: Kafka @ precise64:9092 went away
该kafka.ensure_topic_exists
呼叫做出两次.第一次运行时,它会返回一个警告然后创建主题,所以我可以看到Python正在通过端口9092与Kafka通信.但是,我无法向队列发送消息.
你能看出我做错了什么吗?