2011-08-17 174 views
0

使用Ruby amqp庫和Ruby 1.8.7的v0.7.1,我試圖向RabbitMQ服務器發送大量(百萬)短(〜40字節)消息。我的程序的主循環(當然,不是一個真正的循環,但仍然)是這樣的:向AMQP隊列發送大量消息

AMQP.start(:host => '1.2.3.4', 
     :username => 'foo', 
     :password => 'bar') do |connection| 

    channel = AMQP::Channel.new(connection) 
    exchange = channel.topic("foobar", {:durable => true}) 
    i = 0 

    EM.add_periodic_timer(1) do 
    print "\rPublished #{i} commits" 
    end 

    results = get_results # <- Returns an array 

    processor = proc do 
    if x = results.shift then 
     exchange.publish(x, :persistent => true, 
         :routing_key => "test.#{i}") 
     i += 1 
     EM.next_tick processor 
     end 
    end 
    EM.next_tick(processor) 
    AMQP.stop {EM.stop} end 

代碼開始處理結果陣列就好了,但經過一段時間(通常情況下,後12K的消息左右),它具有以下錯誤的模具

/Library/Ruby/Gems/1.8/gems/amqp-0.7.1/lib/amqp/channel.rb:807:in `send': 
The channel 1 was closed, you can't use it anymore! (AMQP::ChannelClosedError) 

沒有消息存儲在隊列中。錯誤似乎在從程序到隊列服務器的網絡活動開始時發生。

我在做什麼錯?

+0

RabbitMQ日誌說什麼?經紀人仍在運行? 'lsof -i:5672'返回什麼? –

+0

沒什麼特別的,它表示在腳本啓動時打開一個連接,然後關閉連接。甚至在我的代碼失敗後,RabbitMQ仍能正常地爲其他隊列和客戶端服務。我不認爲這是RabbitMQ的問題。 –

回答

-1

第一個錯誤是您沒有發佈您正在使用的RabbitMQ版本。許多人正在運行陳舊的舊版本1.7.2,因爲這是他們的OS軟件包存儲庫中的內容。對於發送您的消息量的任何人都是不好的舉動。從RabbitMQ站點獲取RabbitMQ 2.5.1,並擺脫默認的系統包。

第二個錯誤是您沒有告訴我們RabbitMQ日誌中的內容。

第三個錯誤是你沒有說什麼消費信息。是否有另一個進程運行某個地方聲明瞭隊列並將其綁定到交換機上。除非有人向RabbitMQ聲明並將其綁定到交換機,否則有消息隊列。即使這樣,如果隊列的綁定密鑰與您發佈的路由密鑰匹配,消息也只會流動。

第四個錯誤。您將路由密鑰和綁定密鑰混合起來。路由密鑰是一個字符串,例如topic.test.json.echos,綁定密鑰(用於將隊列綁定到交換機)是一種類似主題。#或主題的模式。 .json。

你澄清後更新 至於版本,我不知道,當它被固定的,但是有一個問題,在1.7.2有大量造成的RabbitMQ持久性消息的崩潰,當它滑過它的持久性日誌,並在崩潰後,它無法重新啓動,直到有人手動取消翻轉。

當你說連接正在打開和關閉時,我希望它不是每條消息。這將是使用AMQP的一種奇怪的方式。

讓我重複一遍。生產者做不是將消息寫入隊列。他們將消息寫入交換機,然後根據路由鍵(字符串)和隊列的綁定鍵(模式)將消息路由到隊列。在你的例子中,我錯誤地使用了#號,但是我沒有看到任何聲明隊列並將它綁定到交換機的東西。

+0

讓我澄清一下:1.的確,我正在使用Debian的默認版本(1.8.1)。使用Python的AMQP庫,這個版本可以忍受數十萬條消息而不會冒汗。 2.沒有什麼特別的,只是連接打開,然後關閉。 3.有幾個消費者聲明適當的隊列綁定(經過驗證的工作)。但是,消費者不應該影響生產者將消息寫入隊列的能力。不,這不是一個隊列綁定,這是Ruby替換字符串的方式。 –

+0

總而言之,我認爲這是一個Ruby AMQP/eventmachine bug,而不是我的代碼或RabbitMQ中的錯誤。整個設置對於少量消息正常工作。 –