2013-02-20 119 views
0

我想同步我的應用程序,因爲有時服務器將消息發送給錯誤的用戶。我使用同步塊來同步隊列,但我的解決方案不起作用 - 有時用戶不會收到消息。使用同步塊的同步隊列

下面是代碼(server.java): (InWorker - 從用戶收到的消息,OutWorker - 向用戶發送消息)每個用戶都擁有自己的類(線程) - MiniServer(包含兩個線程:InWorkerOutWorker)。

class InWorker implements Runnable{ 

String slowo=null; 
ObjectOutputStream oos; 
ObjectInputStream ois; 
ConcurrentMap<String,LinkedBlockingQueue<Message>> map=new ConcurrentHashMap<String, LinkedBlockingQueue<Message>>(); 
Message message=null; 

InWorker(ObjectInputStream ois,ConcurrentMap<String,LinkedBlockingQueue<Message>> map) { 
    this.ois=ois; 
    this.map=map; 
} 

public void run() { 

    while(true) { 
      //synchronized(queue) { 
     try { 
      message = (Message) ois.readObject(); 
      slowo=message.msg; 
      if(slowo!=null && !slowo.equals("Bye")) { 
         if(!map.containsKey(message.id)) { 
          map.putIfAbsent(message.id, new LinkedBlockingQueue<Message>()); 
         try { 
          map.get(message.id).put(message); 
         } catch (InterruptedException ex) { 
          Logger.getLogger(Communicator.class.getName()).log(Level.SEVERE, null, ex); 
         } 
         } 
         else 
         { 
         try { 
          map.get(message.id).put(message); 
         } catch (InterruptedException ex) { 
          Logger.getLogger(Communicator.class.getName()).log(Level.SEVERE, null, ex); 
         } 
         } 
         } 

     } catch (ClassNotFoundException e) { 
      e.printStackTrace(); 
     } catch (IOException e) { 
     e.printStackTrace(); 
     } 
      //} 
     Thread.yield(); 
     } 
} 
} 

class OutWorker implements Runnable{ 

String tekst=null; 
ObjectOutputStream oos=null; 
String id; 
Message message; 
ConcurrentMap<String,LinkedBlockingQueue<Message>> map=new ConcurrentHashMap<String, LinkedBlockingQueue<Message>>(); 

OutWorker(ObjectOutputStream oos,String id,ConcurrentMap<String,LinkedBlockingQueue<Message>> map) { 
    this.oos=oos; 
    this.id=id; 
    this.map=map; 
} 

public void run() { 
    while(true) { 
      //synchronized(queue) { 
       if(map.containsKey(id)) { 
       while(!map.get(id).isEmpty()) { 
         try { 
          message=map.get(id).take(); 
         } catch (InterruptedException ex) { 
          Logger.getLogger(OutWorker.class.getName()).log(Level.SEVERE, null, ex); 
         } 
         try { 
          oos.writeObject(message); 
          oos.flush(); 
         } catch (IOException e) { 
          e.printStackTrace(); 
         } 
       } 

       } 
      //} 
     Thread.yield(); 
}}} 

這裏是MiniServer和服務器類:

class MiniSerwer implements Runnable{ 

    Socket socket=null; 
    ExecutorService exec=Executors.newCachedThreadPool(); 
    ObjectOutputStream oos=null; 
    ObjectInputStream ois=null; 
    String id; 
    Queue<Message> queue=new LinkedList<Message>(); 

    MiniSerwer(ObjectOutputStream oos,ObjectInputStream ois,String id,Queue<Message> queue) { 
     this.oos=oos; 
       this.ois=ois; 
     this.id=id; 
     this.queue=queue; 
    } 

    public void run() { 
      exec.execute(new InWorker(ois,queue)); // input stream 
      exec.execute(new OutWorker(oos,id,queue)); //output stream 
      Thread.yield(); 
    } 
} 

public class Serwer implements Runnable{ 

ServerSocket serversocket=null; 
ExecutorService exec= Executors.newCachedThreadPool(); 
int port; 
String id=null; 
Queue<Message> queue=new LinkedList<Message>(); 
BufferedReader odczyt=null; 

ObjectInputStream ois=null; 
Message message=null; 
ObjectOutputStream oos=null; 

Serwer(int port) { 
    this.port=port; 
} 

public void run() { 
    try { 
     serversocket=new ServerSocket(port); 
     while(true) { 
      Socket socket=null; 
      try { 
       socket = serversocket.accept();         
           /* first message is login*/ 
           oos=new ObjectOutputStream(socket.getOutputStream()); 
           oos.flush(); 
           ois=new ObjectInputStream(socket.getInputStream()); 
           message = (Message) ois.readObject(); 
           id=message.sender; 
           System.out.println(id+" log in to the server"); 

           exec.execute(new MiniSerwer(oos,ois,id,queue)); // create new thread 
      } catch (IOException e) { 
       e.printStackTrace(); 
       throw new RuntimeException(e); 
      } 
         catch (ClassNotFoundException e) { 
          e.printStackTrace(); 
         }  
     } 
    } catch (IOException e) { 
     e.printStackTrace(); 
} 
} 

public static void main(String[] args) { 
    int port; 
     port=8821; 
    ExecutorService exec=Executors.newCachedThreadPool(); 
    exec.execute(new Serwer(port)); 
} 

誰能幫助我?

編輯:我將隊列更改爲ConcurrentHashMap,但有時會將消息發送給錯誤的用戶。爲什麼?

+0

「不起作用」是什麼意思? – jtahlborn 2013-02-20 17:07:28

+0

您應該查看['java.util.concurrent'](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html)包一個「開箱即用」的線程安全隊列。 – mre 2013-02-20 17:09:10

+0

有時用戶收到不是他的消息 – user1518451 2013-02-20 17:09:48

回答

5

這是一個典型的生產者/消費者場景。溝槽同步的塊並使用BlockingQueue(InWorker調用put()和OutWorker調用take())。

也是,在你的服務器類,你應該創建一個新的隊列連接,而不是共享相同的一個跨所有連接。

+0

我將隊列更改爲ConcurrentHashMap,但服務器有時會將消息發送給錯誤的用戶。你能有其他想法嗎?我添加了對第一篇文章(InWorker和OutWorker)的更改。 – user1518451 2013-02-20 19:35:31

+0

@ user1518451 - 首先,我從來沒有對ConcurrentHashMap說過什麼,我說過一個_BlockingQueue_。其次,你是否進行了更改,以便同一隊列不被多個用戶共享(我的答案中的第二句話)? – jtahlborn 2013-02-20 21:03:51

+0

好的,但我不記得我怎麼能發送消息到正確的接收器(每個連接使用新的隊列)。在我的解決方案中,我將queue.element()中的id與消息中的id進行比較。如果每個連接都有自己的隊列,我該怎麼做? – user1518451 2013-02-20 22:08:46