你需要稍微權衡是否有很多高端機器和偶爾的消息對每個或幾個高端機器和頻繁的消息到每個。
如果你有很多高端機器,那麼從字面上有每個高端機一個線程聽起來有點潔癖,除非你真的要在這裏不停流消息,所有這些機器。我會建議有一個線程池,它只會在特定範圍內增長。爲此,您可以使用ThreadPoolExecutor。當您需要發佈一個消息,你實際上提出一個可運行的執行人將發送消息:
Executor msgExec = new ThreadPoolExecutor(...);
public void sendMessage(final String machineId, byte[] message) {
msgExec.execute(new Runnable() {
public void run() {
sendMessageNow(machineId, message);
}
});
}
private void sendMessageNow(String machineId, byte[] message) {
// open connection to machine and send message, thinking
// about the case of two simultaneous messages to a machine,
// and whether you want to cache connections.
}
如果你只是有幾個高端機器,那麼你可以有每臺機器的BlockingQueue,並且一個線程每個阻塞隊列等待下一條消息。在這種情況下,該模式是更喜歡這個(小心未經檢驗的關閉頂級頭週日上午代碼):
ConcurrentHashMap<String,BockingQueue> queuePerMachine;
public void sendMessage(String machineId, byte[] message) {
BockingQueue<Message> q = queuePerMachine.get(machineId);
if (q == null) {
q = new BockingQueue<Message>();
BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q);
if (prev != null) {
q = prev;
} else {
(new QueueProessor(q)).start();
}
}
q.put(new Message(message));
}
private class QueueProessor extends Thread {
private final BockingQueue<Message> q;
QueueProessor(BockingQueue<Message> q) {
this.q = q;
}
public void run() {
Socket s = null;
for (;;) {
boolean needTimeOut = (s != null);
Message m = needTimeOut ?
q.poll(60000, TimeUnit.MILLISECOND) :
q.take();
if (m == null) {
if (s != null)
// close s and null
} else {
if (s == null) {
// open s
}
// send message down s
}
}
// add appropriate error handling and finally
}
}
在這種情況下,我們關閉了連接,如果那臺機器沒有消息在60秒內到達。
您應該使用JMS嗎?那麼,你必須權衡這是否聽起來很複雜。我個人的感覺是,要保證一個特殊的框架並不複雜。但我確定意見不同。
P.S.實際上,現在我看着這個,你可能會把隊列放入線程對象中,並且只映射機器ID - >線程對象。無論如何,你明白了。