使用Ruby amqp庫和Ruby 1.8.7的v0.7.1,我試圖向RabbitMQ服務器發送大量(百萬)短(〜40字節)消息。我的程序的主循環(當然,不是一個真正的循環,但仍然)是這樣的:向AMQP隊列發送大量消息
AMQP.start(:host => '1.2.3.4',
:username => 'foo',
:password => 'bar') do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("foobar", {:durable => true})
i = 0
EM.add_periodic_timer(1) do
print "\rPublished #{i} commits"
end
results = get_results # <- Returns an array
processor = proc do
if x = results.shift then
exchange.publish(x, :persistent => true,
:routing_key => "test.#{i}")
i += 1
EM.next_tick processor
end
end
EM.next_tick(processor)
AMQP.stop {EM.stop} end
代碼開始處理結果陣列就好了,但經過一段時間(通常情況下,後12K的消息左右),它具有以下錯誤的模具
/Library/Ruby/Gems/1.8/gems/amqp-0.7.1/lib/amqp/channel.rb:807:in `send':
The channel 1 was closed, you can't use it anymore! (AMQP::ChannelClosedError)
沒有消息存儲在隊列中。錯誤似乎在從程序到隊列服務器的網絡活動開始時發生。
我在做什麼錯?
RabbitMQ日誌說什麼?經紀人仍在運行? 'lsof -i:5672'返回什麼? –
沒什麼特別的,它表示在腳本啓動時打開一個連接,然後關閉連接。甚至在我的代碼失敗後,RabbitMQ仍能正常地爲其他隊列和客戶端服務。我不認爲這是RabbitMQ的問題。 –