2015-11-01 74 views
-1
package port_channel; 
import java.util.*; 
import java.util.concurrent.*; 
import java.io.*; 
import java.net.*; 

public class ChannelPort implements Runnable { 
    int portNum; 
    int nSize; 
    PrintWriter[] outs ; 
    Listner[] listners; 
    ConcurrentLinkedQueue<MessageType> que; 

    public ChannelPort(int portNum, int networkSize) { 
     this.portNum = portNum; 
     this.nSize = networkSize; 
     outs = new PrintWriter[nSize]; 
     listners = new Listner[nSize]; 
     que = new ConcurrentLinkedQueue<MessageType>(); 

    } 

    public void initialize() { 
     ServerSocket serverSocket = null; 
     try { 
      serverSocket = new ServerSocket(portNum); 
     } catch (IOException ioe) { } 
     for (int j = 0; j < nSize; j++) { 
      try { 
       Socket clientSocket = serverSocket.accept();  
       // not part of communication 
       outs[j] = new PrintWriter(new OutputStreamWriter(clientSocket.getOutputStream())); 
       ObjectInputStream in = new ObjectInputStream(clientSocket.getInputStream()); 
       listners[j] = new Listner(j, in, this); 
      } catch (IOException ioe) { 
       System.err.println("Failed in connection for j=" + j); 
       ioe.printStackTrace(); 
       System.exit(-1); 
      } 
     } 
     System.out.println("Connections are all established."); 
    } 
     //thread 
    public void run() { 
     initialize(); 
     for (int j = 0; j < nSize; j++) { 
      listners[j].start(); 
     } 
    } 

    synchronized void gotMessage(MessageType message) { 
     que.offer(message); 
     notifyAll(); 
    } 

    public synchronized MessageType receive() { 
     while (que.isEmpty()) { 
      try { 
       wait(); 
      } catch (InterruptedException ire) { 
       ire.printStackTrace(); 
      } 
     } 
     MessageType msg = que.poll(); 
     System.out.println("receive: " + msg); 
     return msg; 
    } 

    public synchronized void broadcast(String msgStr) { 
     for (int j = 0; j < outs.length; j++) { 
      outs[j].println(msgStr); 
      outs[j].flush(); 
     } 
    } 

    public int getPortNum() { 
     return portNum; 
    } 

    public void setPortNum(int portNum) { 
     this.portNum = portNum; 
    } 

    public int getnSize() { 
     return nSize; 
    } 

    public ConcurrentLinkedQueue<MessageType> getQue() { 
     return que; 
    } 

    public static void main(String[] args) throws IOException, InterruptedException { 
     if (args.length != 2) 
      System.out.println("usage: java ChannelPort port-number number-of-nodes"); 
     int portNum = Integer.parseInt(args[0]); 
     int numNode = Integer.parseInt(args[1]); 
     ChannelPort cp = new ChannelPort(portNum, numNode); 
     new Thread(cp).start(); 
     Thread.sleep(60000); 
     System.out.println("Shutdown"); 
     Iterator<MessageType> ite = cp.getQue().iterator(); 
     while (ite.hasNext()) { 
      System.out.println(ite.next()); 
     } 
    } 
} 
//thread 
class Listner extends Thread { 
    int pId; 
    ObjectInputStream in; 
    ChannelPort cPort; 
    boolean done = false; 
    final int ERR_THRESHOLD = 100; 

    public Listner(int id, ObjectInputStream in, ChannelPort cPort) { 
     this.pId = id; 
     this.in = in; 
     this.cPort = cPort; 
    } 
    public void run() { 
     MessageType msg; 
     int errCnt = 0; 
     while(in != null) { 
      try { 
       msg = (MessageType)in.readObject(); 
       System.out.println("process " + pId + ": " + msg); 
       cPort.gotMessage(msg); 
      } catch (ClassNotFoundException cnfe) { 
       cnfe.printStackTrace(); 
      } catch (SocketException se) { 
       System.err.println(se); 
       errCnt++; 
       if (errCnt > ERR_THRESHOLD) System.exit(0); 
      } catch (IOException ioe) { 
       ioe.printStackTrace(); 
      } 
     } 
    } 
} 
+0

代碼是完全上到下 –

+0

哪裏是線31在電子代碼塊。標記它將有很大幫助。 – sean

+0

這裏沒有NIO。不要盲目標籤。 – EJP

回答

2

您是否檢查了serverSocket創建正確?

您在創建ServerSocket時抑制了異常。

catch (IOException ioe) { 
      //print stack trace and see why exception occurs. 
    } 

serverSocket可能是在第二try塊空,所以serverSocket.accept()拋出NPE,解決這個錯誤

+0

java.net.BindException:地址已在使用:JVM_Bind \t在java.net.DualStackPlainSocketImpl.bind0(本機方法) \t在java.net.DualStackPlainSocketImpl.socketBind(DualStackPlainSocketImpl.java:106 ) \t在java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376) –

+0

上述錯誤的PLZ正在添加解決它 –

+1

@BipinMaurya你有沒有考慮*尋找它?* – EJP

相關問題