我想同步我的應用程序,因爲有時服務器將消息發送給錯誤的用戶。我使用同步塊來同步隊列,但我的解決方案不起作用 - 有時用戶不會收到消息。使用同步塊的同步隊列
下面是代碼(server.java): (InWorker - 從用戶收到的消息,OutWorker
- 向用戶發送消息)每個用戶都擁有自己的類(線程) - MiniServer
(包含兩個線程:InWorker
和OutWorker
)。
class InWorker implements Runnable{
String slowo=null;
ObjectOutputStream oos;
ObjectInputStream ois;
ConcurrentMap<String,LinkedBlockingQueue<Message>> map=new ConcurrentHashMap<String, LinkedBlockingQueue<Message>>();
Message message=null;
InWorker(ObjectInputStream ois,ConcurrentMap<String,LinkedBlockingQueue<Message>> map) {
this.ois=ois;
this.map=map;
}
public void run() {
while(true) {
//synchronized(queue) {
try {
message = (Message) ois.readObject();
slowo=message.msg;
if(slowo!=null && !slowo.equals("Bye")) {
if(!map.containsKey(message.id)) {
map.putIfAbsent(message.id, new LinkedBlockingQueue<Message>());
try {
map.get(message.id).put(message);
} catch (InterruptedException ex) {
Logger.getLogger(Communicator.class.getName()).log(Level.SEVERE, null, ex);
}
}
else
{
try {
map.get(message.id).put(message);
} catch (InterruptedException ex) {
Logger.getLogger(Communicator.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
//}
Thread.yield();
}
}
}
class OutWorker implements Runnable{
String tekst=null;
ObjectOutputStream oos=null;
String id;
Message message;
ConcurrentMap<String,LinkedBlockingQueue<Message>> map=new ConcurrentHashMap<String, LinkedBlockingQueue<Message>>();
OutWorker(ObjectOutputStream oos,String id,ConcurrentMap<String,LinkedBlockingQueue<Message>> map) {
this.oos=oos;
this.id=id;
this.map=map;
}
public void run() {
while(true) {
//synchronized(queue) {
if(map.containsKey(id)) {
while(!map.get(id).isEmpty()) {
try {
message=map.get(id).take();
} catch (InterruptedException ex) {
Logger.getLogger(OutWorker.class.getName()).log(Level.SEVERE, null, ex);
}
try {
oos.writeObject(message);
oos.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//}
Thread.yield();
}}}
這裏是MiniServer和服務器類:
class MiniSerwer implements Runnable{
Socket socket=null;
ExecutorService exec=Executors.newCachedThreadPool();
ObjectOutputStream oos=null;
ObjectInputStream ois=null;
String id;
Queue<Message> queue=new LinkedList<Message>();
MiniSerwer(ObjectOutputStream oos,ObjectInputStream ois,String id,Queue<Message> queue) {
this.oos=oos;
this.ois=ois;
this.id=id;
this.queue=queue;
}
public void run() {
exec.execute(new InWorker(ois,queue)); // input stream
exec.execute(new OutWorker(oos,id,queue)); //output stream
Thread.yield();
}
}
public class Serwer implements Runnable{
ServerSocket serversocket=null;
ExecutorService exec= Executors.newCachedThreadPool();
int port;
String id=null;
Queue<Message> queue=new LinkedList<Message>();
BufferedReader odczyt=null;
ObjectInputStream ois=null;
Message message=null;
ObjectOutputStream oos=null;
Serwer(int port) {
this.port=port;
}
public void run() {
try {
serversocket=new ServerSocket(port);
while(true) {
Socket socket=null;
try {
socket = serversocket.accept();
/* first message is login*/
oos=new ObjectOutputStream(socket.getOutputStream());
oos.flush();
ois=new ObjectInputStream(socket.getInputStream());
message = (Message) ois.readObject();
id=message.sender;
System.out.println(id+" log in to the server");
exec.execute(new MiniSerwer(oos,ois,id,queue)); // create new thread
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port;
port=8821;
ExecutorService exec=Executors.newCachedThreadPool();
exec.execute(new Serwer(port));
}
誰能幫助我?
編輯:我將隊列更改爲ConcurrentHashMap,但有時會將消息發送給錯誤的用戶。爲什麼?
「不起作用」是什麼意思? – jtahlborn 2013-02-20 17:07:28
您應該查看['java.util.concurrent'](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html)包一個「開箱即用」的線程安全隊列。 – mre 2013-02-20 17:09:10
有時用戶收到不是他的消息 – user1518451 2013-02-20 17:09:48