2009-04-26 55 views
0

我正在研究與Java相關的實際場景;一個套接字程序。現有的系統和預期的系統如下。我應該如何處理Java中的多線程?

現有系統 - 系統檢查是否滿足某些條件。如果是這樣會創建一些消息發送並放入隊列中。

隊列處理器是一個單獨的線程。它定期檢查隊列中是否存在物品。如果發現任何項目(消息),它只是將消息發送到遠程主機(硬編碼)並從隊列中刪除項目。

預期系統 - 這是類似的東西。該消息在滿足某個條件時創建,但在每種情況下收件人都不相同。所以有很多方法。

  1. 將消息放入同一個隊列中,但使用其接收方ID。在這種情況下,第二個線程可以識別接收器,以便可以將消息發送到該接收器。

  2. 有多個線程。在這種情況下,當條件滿足時,如果接收器在「新建」中,它創建一個新的隊列並將消息放入該隊列中。並且一個新線程將初始化以處理該隊列。如果下一條消息被定向到相同的收件人,它應該放在同一個隊列中,如果沒有新的隊列並且該線程應該被創建。

現在我想實現第二個,有點stucked。我應該怎麼做?一個骨架就足夠了,你不需要擔心如何創建隊列等...... :)

更新:我也認爲方法1是最好的方法來做到這一點。我閱讀了關於線程的一些文章,並做出了這個決定。但是,如何實施方法2也是非常值得的。

回答

1

首先,如果你計劃有很多接收機,我不會使用一個線程AND-隊列PER-接收機方法。你可能會遇到很多線程在大多數情況下沒有做任何事情,我可能會傷害你的表現。另一種方法是使用工作線程的線程池,只從共享隊列中選擇任務,每個任務都有自己的接收方ID,並且可能還有一個共享字典,其中每個接收方都有套接字連接供工作線程使用。

話雖這麼說,如果你仍然想追求你的方法是什麼,你可以做的是:

1)創建一個新的類來處理新的線程執行:

public class Worker implements Runnable { 
    private Queue<String> myQueue = new Queue<String>(); 
    public void run() 
    { 
     while (true) { 
      string messageToProcess = null; 
      synchronized (myQueue) { 
      if (!myQueue.empty()) { 
       // get your data from queue 
       messageToProcess = myQueue.pop(); 
      } 
      } 
      if (messageToProcess != null) { 
      // do your stuff 
      } 
      Thread.sleep(500); // to avoid spinning 
     } 
    } 
    public void queueMessage(String message) 
    { 
     synchronized(myQueue) { 
     myQueue.add(message); 
     } 
    } 
} 

2)在您的主線程,創建消息並使用字典(散列表)來查看接收者的線程是否已經創建。如果是,則只是將新消息排隊。如果沒有,創建一個新線程,將其放入散列表並將新消息排隊:

while (true) { 
    String msg = getNewCreatedMessage(); // you get your messages from here 
    int id = getNewCreatedMessageId(); // you get your rec's id from here 
    Worker w = myHash(id); 
    if (w == null) { // create new Worker thread 
     w = new Worker(); 
     new Thread(w).start(); 
    } 
    w.queueMessage(msg); 
} 

祝你好運。

編輯:您可以提高使用BlockingQueue布賴恩這種方法提到的這個方法。

2

我可以建議你看看BlockingQueue嗎?你的調度過程可以寫入這個隊列(put),客戶端可以採用線程安全的方式。所以你根本不需要編寫隊列實現。

如果您有一個包含不同消息類型的隊列,那麼您將需要爲每個客戶端實現一些窺視類型的機制(即他們必須檢查隊列的頭部並只取其所屬)。爲了有效地工作,消費者必須以及時和強大的方式提取他們所需的數據。

如果每個消息/消費者類型有一個隊列/線程,那麼這將更容易/更可靠。

您的客戶端實現將只需要循環的:

while (!done) { 
    Object item = queue.take(); 
    // process item 
} 

注意,隊列可以使用仿製藥,並take()阻止。

當然,由於多個消費者正在拍攝不同類型的消息,因此您可能需要考慮space-based architecture。這不會有隊列(FIFO)特性,但會以非常簡單的方式爲您提供多個消費者。

2

你需要稍微權衡是否有很多高端機器和偶爾的消息對每個或幾個高端機器和頻繁的消息到每個。

如果你有很多高端機器,那麼從字面上有每個高端機一個線程聽起來有點潔癖,除非你真的要在這裏不停流消息,所有這些機器。我會建議有一個線程池,它只會在特定範圍內增長。爲此,您可以使用ThreadPoolExecutor。當您需要發佈一個消息,你實際上提出一個可運行的執行人將發送消息:

Executor msgExec = new ThreadPoolExecutor(...); 

public void sendMessage(final String machineId, byte[] message) { 
    msgExec.execute(new Runnable() { 
    public void run() { 
     sendMessageNow(machineId, message); 
    } 
    }); 
} 

private void sendMessageNow(String machineId, byte[] message) { 
    // open connection to machine and send message, thinking 
    // about the case of two simultaneous messages to a machine, 
    // and whether you want to cache connections. 
} 

如果你只是有幾個高端機器,那麼你可以有每臺機器的BlockingQueue,並且一個線程每個阻塞隊列等待下一條消息。在這種情況下,該模式是更喜歡這個(小心未經檢驗的關閉頂級頭週日上午代碼):

ConcurrentHashMap<String,BockingQueue> queuePerMachine; 

public void sendMessage(String machineId, byte[] message) { 
    BockingQueue<Message> q = queuePerMachine.get(machineId); 
    if (q == null) { 
    q = new BockingQueue<Message>(); 
    BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q); 
    if (prev != null) { 
     q = prev; 
    } else { 
     (new QueueProessor(q)).start(); 
    } 
    } 
    q.put(new Message(message)); 
} 

private class QueueProessor extends Thread { 
    private final BockingQueue<Message> q; 
    QueueProessor(BockingQueue<Message> q) { 
    this.q = q; 
    } 
    public void run() { 
    Socket s = null; 
    for (;;) { 
     boolean needTimeOut = (s != null); 
     Message m = needTimeOut ? 
     q.poll(60000, TimeUnit.MILLISECOND) : 
     q.take(); 
     if (m == null) { 
     if (s != null) 
      // close s and null 
     } else { 
     if (s == null) { 
      // open s 
     } 
     // send message down s 
     } 
    } 
    // add appropriate error handling and finally 
    } 
} 

在這種情況下,我們關閉了連接,如果那臺機器沒有消息在60秒內到達。

您應該使用JMS嗎?那麼,你必須權衡這是否聽起來很複雜。我個人的感覺是,要保證一個特殊的框架並不複雜。但我確定意見不同。

P.S.實際上,現在我看着這個,你可能會把隊列放入線程對象中,並且只映射機器ID - >線程對象。無論如何,你明白了。

2

您可以嘗試使用SomnifugiJMS,使用java.util.concurrent中的各種實際的「引擎」 VM內的JMS實現。

這將可能是你的目的有點矯枉過正,但很可能使你的應用程序要被分發用於幾乎沒有額外的編程(如適用),您只需插入一個不同的JMS實現像ActiveMQ就大功告成了。