2011-08-24 152 views
6

我有大量的MongoDB集合,它們從各種流源獲取大量JSON文檔。換句話說,有很多進程不斷地將數據插入到一組MongoDB集合中。MongoDb實時(或接近實時)流式輸出插入的數據

我需要一種方法將數據從MongoDB流式傳輸到下游應用程序。所以,我希望有一個系統,從概念上看起來是這樣的:

App Stream1 --> 
App Stream2 -->  MONGODB  ---> Aggregated Stream 
App Stream3 --> 

OR這樣的:

App Stream1 -->     ---> MongoD Stream1 
App Stream2 -->  MONGODB  ---> MongoD Stream2 
App Stream3 -->     ---> MongoD Stream3 

的問題是我怎麼流出來的數據蒙戈,而不必不斷地輪詢/查詢數據庫?

顯而易見的問題答案是「爲什麼不改變這些應用程序流式處理來發送消息到像Rabbit,Zero或ActiveMQ這樣的隊列,然後讓它們一次發送到Mongo流處理和Mongo像這樣:」

    MONGODB 
        /|\ 
        | 
App Stream1 -->  |   ---> MongoD Stream1 
App Stream2 --> SomeMQqueue ---> MongoD Stream2 
App Stream3 -->    ---> MongoD Stream3 

在一個理想的世界是的,這將是一件好事,但我們需要蒙戈以確保郵件先保存起來,以避免重複,並確保ID是所有生成等蒙戈在中間作爲持久坐層。

那麼如何將消息從Mongo集合中流出(不使用GridFS等)到這些下游應用中。基本的思想流派只是對新文檔進行輪詢,並且通過向存儲在數據庫中的JSON文檔添加另一個字段來更新所收集的每個文檔,就像存儲處理時間戳的SQL表中的進程標誌一樣。即每處理1秒輪詢文檔處理== null .... add processed = now()....更新文檔。

是否有更整潔/更具計算效率的方法?

僅供參考 - 這些都是Java進程。

乾杯!

回答

3

如果您正在寫入一個上限集合(或多個集合),則可以使用tailablecursor在流上推送新數據,或者在可從其流出數據的消息隊列上推送新數據。然而,這對於無限制的收集不起作用。

+0

感謝您的鏈接。可悲的是不使用封頂的集合,但對於消息服務來說並不是一個壞的功能。聽起來就像處理過的標誌上的索引,輪詢是唯一的選擇...如果索引項爲空,它仍然在索引中引用,或者查詢空仍意味着收集掃描? – NightWolf

+1

或者我可以在一個固定大小的行爲像一個緩存,然後拉出物品一買1,並把它們放回到一個普通的集合。那麼問題就變成了我們如何在app運行之間保存位置光標?我假設我們只是使用Mongo自動生成的_id字段並選擇大於該ID字段的所有內容......是否所有mongo生成的_id都是按照遞增的順序排列的? – NightWolf

+1

索引存儲'null'的條目。如果你要加標籤的集合,你確實需要存儲你看到的最後一個條目(不管你想要存儲這個,使用另一個mongo集合都可以正常工作),然後在該元素處使用'$ min '和'跳過(1)'恢復。請參閱http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-%24minand%24max – dcrosta