2010-11-09 51 views
1

我有這樣的方法來寫郵件到插座插座:關閉時,所有待處理的郵件已發送

public void sendMessage(byte[] msgB) { 
    try { 
     synchronized (writeLock) { 
      log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB)); 
      ous.write(HEADER_MSG); 
      ous.writeInt(msgB.length); 
      ous.write(msgB); 
      ous.flush(); 
     } 
    } catch (IOException e) { 
     throw new RuntimeException(e); 
    } 
} 

現在一個線程調用鮑勃想在某個undeterministic時刻X關閉套接字,這意味着可能仍有線程等待writeLock發送他們的消息,甚至可能有一個線程正在寫入它的中間。

我可以通過關閉套接字之前讓鮑勃獲取writeLock解決了後者,但我仍然可以失去尚未開始發送消息,因爲據我所知​​是不公平的,鮑勃能拿在一些等待時間更長的其他線程之前鎖定。

我需要的是所有的呼叫sendMessage作出X前通常做他們的工作,並要求作出後X拋出一個錯誤。我怎樣才能做到這一點?

  • 具體細節:鮑勃是線程從套接字的輸入流和X當在該流接收到「關閉」消息是讀取。

回答

1

我想我可以用一個ReentrantLock替換同步塊設置爲公平的。

2

考慮使用單線程ExecutorService來執行消息的寫入。發送線程只需通過調用execute(Runnable)submit(Callable)來嘗試「發送」他們的消息。一旦您希望停止發送消息,就會關閉ExecutorServiceshutdown()),導致後續調用提交/執行以產生RejectedExecutionException

這種方法的優點是你只有一個I/O綁定線程和較少的鎖定爭用比如果你有多個線程等待自己寫消息。這也是一個更好的問題分離。

下面是一個簡單的例子,OO-ifies的問題有點多:

public interface Message { 
    /** 
    * Writes the message to the specified stream. 
    */ 
    void writeTo(OutputStream os); 
} 

public class Dispatcher { 
    private final ExecutorService sender; 
    private final OutputStream out; 

    public Dispatcher() { 
    this.sender = Executors.newSingleThreadExecutor(); 
    this.out = ...; // Set up output stream. 
    } 

    /** 
    * Enqueue message to be sent. Return a Future to allow calling thread 
    * to perform a blocking get() if they wish to perform a synchronous send. 
    */ 
    public Future<?> sendMessage(final Message msg) { 
    return sender.submit(new Callable<Void>() { 
     public Void call() throws Exception { 
     msg.writeTo(out); 
     return null; 
     } 
    }); 
    } 

    public void shutDown() { 
    sender.shutdown(); // Waits for all tasks to finish sending. 

    // Close quietly, swallow exception. 
    try { 
     out.close(); 
    } catch (IOException ex) { 
    } 
    } 
} 
+0

我用我自己的方法,因爲我不想添加另一個線程到已經有數百個,但upvoted無論如何因爲它在技術上是正確的方式來做到這一點。 – 2010-11-18 16:07:07

2

你可以在這裏使用一個執行者。由於每個發送消息都是同步的(我假設在一個共享對象上),因此可以使用線程限制。

static final ExecutorService executor = Executors.newSingleThreadExecutor(); 

public void sendMessage(byte[] msgB) { 
    executor.submit(new Runnable() { 
     public void run() { 
      try { 
       log.debug("Sending message (" + msgB.length + "): " + HexBytes.toHex(msgB)); 
       ous.write(HEADER_MSG); 
       ous.writeInt(msgB.length); 
       ous.write(msgB); 
       ous.flush(); 
      } catch (IOException e) { 
       throw new RuntimeException(e); 
      } 
     } 
    }); 
} 
public static void atMomentX(){ 
    executor.shutdown(); 
} 

當完成另一個線程可以調用atMomentX();

Javadoc中的關機方法說:

發起一個有序的關閉,其中 以前提交的任務執行 ,但沒有新的任務將是 接受。如果已關閉 ,調用沒有 附加效果。

+0

我使用我自己的方法,因爲我不想將另一個線程添加到已經有數百個服務器系統,但無論如何upvoted,因爲它在技術上是正確的方式來做到這一點。 – 2010-11-18 16:07:26