2012-02-03 149 views
2

我目前正在開發一個使用異步處理分配的系統。信息傳輸是使用隊列完成的。所以一個進程會將信息放入Queue(並終止),另一個進程會將其提交併處理。我的實現使我面臨許多挑戰,我對每個人的方法對這些問題感興趣(在架構和庫方面)。Java異步處理

讓我畫這幅畫。比方說你有三個過程:

Process A -----> Process B 
         | 
Process C <-----------| 

所以進程A把一個消息隊列和結束,進程B拾取消息,處理它,並把它放在一個「回」隊列。 進程C接收消息並對其進行處理。

  1. 如何一個手柄方法B不聽或處理消息從隊列?是否有一些JMS類型的方法阻止生產者在消費者不活躍時提交消息?所以進程A將提交但拋出異常。
  2. 比方說工藝C必須獲得在X分鐘內答覆,但進程B已停止(因任何原因),有沒有一些強制執行的隊列超時機制?所以保證在X分鐘內回覆,將啓動進程C

所有這些事情都可以使用死信來處理某種排隊嗎?我應該用定時器手動完成這一切,並檢查。我提到過JMS,但我對任何事情都很開放,實際上我使用Hazelcast作爲隊列。

請注意,這是更多的一個架構問題,在可用的Java技術和方法方面,我確實認爲這是一個適當的問題。

任何建議將不勝感激。

感謝

+0

你看過阿卡嗎?演員聽起來像是你的情況的理想解決方案。儘管akka在scala中很流行,但它也適用於java。 – Albert 2012-02-03 08:19:49

+0

我會看看阿卡。感謝大家的解決方案。 – Paul 2012-02-03 08:38:52

回答

2

恕我直言,最簡單的解決方案是使用ExecutorService或基於執行程序服務的解決方案。這支持一個工作隊列,計劃任務(用於超時)。

它也可以在一個單獨的過程中工作。 (我相信Hazelcast支持分佈式ExecutorService)

2

在我看來,那你要問的問題的類型是「聞香」是隊列和異步處理可能不適合您的情況最好的工具。

1)失敗了排隊的目的。聽起來就像你需要一個同步的請求 - 響應過程。

2)進程C一般不會得到回覆。它從隊列中獲取消息。如果隊列中有消息,並且進程C已準備就緒,那麼它會得到它。例如,流程C可以決定消息一旦得到就失效了。

+0

我看到你在說什麼,我想你可能是對的。謝謝 – Paul 2012-02-03 06:59:53

1

那麼,排隊的一點就是讓事情保持獨立。

如果你沒有被困在任何特定的技術上,你可以爲你的隊列使用數據庫。

但首先,確保兩個進程協調的簡單機制是使用套接字。如果可行的話,簡單地讓進程B在一些熟知的端口上創建一個開放的套接字監聽器,然後進程A將連接到該套接字並監視它。如果進程B永遠消失了,進程A可以告訴,因爲他們的插座得到關機,並且可以使用,作爲對使用過程B.

問題對於B警報 - > C題,有一個數據庫表:

create table queue (
    id integer, 
    payload varchar(100), // or whatever you can use to indicate a payload 
    status varchar(1), 
    updated timestamp 
) 

然後,進程A將其條目放入隊列中,當前時間和狀態爲「B」。 B,聽隊列:

select * from queue where status = 'B' order by updated 

當B完成時,它更新隊列以將狀態設置爲「C」。

同時,「C」是輪詢分貝:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 

(含但是閾值是多久,你想要的東西腐爛隊列)。

最後,C將隊列行更新爲'D'完成,或刪除它,或者任何你喜歡的。

黑暗的一面是在這裏有一些競爭條件,C可能會試圖抓住一個條目,而B剛剛啓動。你可以通過嚴格的隔離級別和鎖定來完成。簡直如下:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 
FOR UPDATE 

也使用FOR UPDATE進行B的選擇。通過這種方式,贏得選擇比賽的人將在該排上獲得排他性鎖定。

這會讓你在實際功能方面走得很遠。

1

您正期待異步(消息)設置的同步處理的語義是不可能的。我曾參與WebSphere MQ,通常當消費者死亡時,消息永遠保留在隊列中(除非設置過期)。一旦隊列達到其深度,後續消息將被移至死信隊列。

1

我已經使用類似的方法來創建視頻代碼轉換作業的排隊和處理系統。基本上,它的工作方式是:

  1. Process A職位「日程安排」消息Arbiter Q,它增加了工作到它的「等待」隊列。
  2. Process B請求從Arbiter Q請求下一個作業,該作業將刪除其「等待」隊列中的下一個項目(受一些自定義調度邏輯的限制,以確保單個用戶無法氾濫轉碼請求並阻止其他用戶轉碼視頻),並在將作業返回到Process B之前插入其「處理」集。當它進入「處理」集合時,作業被加時間戳。
  3. Process B完成作業併發送一條「完整」消息至Arbiter Q,該消息將作業從「處理」集合中刪除,然後修改某個狀態,以便Process C知道作業已完成。
  4. Arbiter Q定期檢查其「處理」集中的作業,並超時運行異常長時間的任何作業。如果需要,Process A然後可以自由嘗試重新排隊相同的作業。

這是使用JMX實現的(JMS本來更合適,但我離題了)。Process A只是響應用戶啓動的轉碼請求的servlet線程。 Arbiter Q是一個MBean單例(在服務器集羣中的所有節點上持久/複製),它們收到「時間表」和「完整」消息。其內部管理的「隊列」只是List實例,當作業完成時,它會修改應用程序數據庫中的值以引用轉碼後的視頻文件的URL。 Process B是轉碼線程。它的工作僅僅是要求工作,對其進行轉碼,然後在完成時進行報告。一遍又一遍,直到時間結束。 Process C是另一個用戶/ servlet線程。它會看到該URL可用,並向用戶呈現下載鏈接。

在這種情況下,如果Process B已經死亡,那麼工作將永遠坐在「等待」隊列中。然而,實際上,這從未發生過。如果您的Process B沒有運行/正在執行它應該做的事情,那麼我認爲這會在您的部署/配置/實施Process B中提出一個問題,而不是您整體方法中的問題。

2

我想你的第一個問題已經被其他海報充分回答了。

在第二個問題上,根據應用程序使用的消息傳遞引擎,您嘗試執行的操作可能是可能的。我知道這適用於IBM MQ。我已經看到使用WebSphere MQ Classes for Java而不是JMS來完成此操作。它的工作方式是當進程A在隊列中放入消息時,它指定了等待響應消息的時間。如果進程A未能在指定的時間內收到響應消息,則系統會引發相應的異常。

我不認爲在JMS中有一種標準的方式來處理請求/響應超時,因此您可能必須使用特定於平臺的類,例如WebSphere MQ Classes for Java。