2010-04-21 95 views
4

我是多線程新手& Java編程的套接字編程。我想知道什麼是實現2個線程的最佳方式 - 一個用於接收套接字,另一個用於發送套接字。如果我想要做的事聽起來很荒謬,請讓我知道爲什麼!代碼主要來自Sun的在線教程。我想使用多播套接字,以便我可以使用多播組。Java:多線程和UDP套接字編程

class Server extends Thread 
{ 

    static protected MulticastSocket socket = null; 
    protected BufferedReader in = null; 
    public InetAddress group; 

    private static class Receive implements Runnable 
    { 

     public void run() 
     { 
      try 
      { 
       byte[] buf = new byte[256]; 
       DatagramPacket pkt = new DatagramPacket(buf,buf.length); 
       socket.receive(pkt); 
       String received = new String(pkt.getData(),0,pkt.getLength()); 
       System.out.println("From [email protected]" + received);   
       Thread.sleep(1000); 
      } 
      catch (IOException e) 
      { 
       System.out.println("Error:"+e); 
      } 
      catch (InterruptedException e) 
      { 
       System.out.println("Error:"+e); 
      } 

     } 

    } 


    public Server() throws IOException 
    { 
     super("server"); 
     socket = new MulticastSocket(4446); 
     group = InetAddress.getByName("239.231.12.3"); 
     socket.joinGroup(group); 
    } 

    public void run() 
    { 

     while(1>0) 
     { 
      try 
      { 
       byte[] buf = new byte[256]; 
       DatagramPacket pkt = new DatagramPacket(buf,buf.length);   
       //String msg = reader.readLine(); 
       String pid = ManagementFactory.getRuntimeMXBean().getName(); 
       buf = pid.getBytes(); 
       pkt = new DatagramPacket(buf,buf.length,group,4446); 
       socket.send(pkt); 
       Thread t = new Thread(new Receive()); 
       t.start(); 

       while(t.isAlive()) 
       { 
        t.join(1000); 
       } 
       sleep(1); 
      } 
      catch (IOException e) 
      { 
       System.out.println("Error:"+e); 
      } 
      catch (InterruptedException e) 
      { 
       System.out.println("Error:"+e); 
      } 

     } 
     //socket.close(); 
    } 

    public static void main(String[] args) throws IOException 
    { 
     new Server().start(); 
     //System.out.println("Hello"); 
    } 

} 
+0

你最終的目標是什麼? – Xailor 2010-04-21 23:48:43

+0

@Ravi,我修正了你的格式,但你應該編輯類名......讓它們以大寫字母開頭。當你的班級名稱以小寫字母開頭時,閱讀你的代碼是很痛苦的。 – Kiril 2010-04-22 00:49:49

+0

@Xepoch:我的最終目標是在分佈式系統中實現某些協議 @Lirik:對於類名稱感到抱歉!我現在修好了。 – Ravi 2010-04-22 12:44:22

回答

2

婉婷在應用程序中創建線程是不是荒謬的!您不需要完全2個線程,但我認爲您正在討論實現Runnable接口的2個類。

自Java 1.5以來,線程API已經變得更好,而且您不需要再亂用java.lang.Thread。您可以簡單地創建一個java.util.concurrent.Executor並向其提交Runnable實例。

本書Java Concurrency in Practice使用了這個確切的問題 - 創建一個線程套接字服務器 - 並且遍歷代碼的幾次迭代以顯示最佳方式。查看免費樣本章節,這非常棒。我不會在這裏複製/粘貼代碼,但是請特別注意列表6.8。

+0

謝謝德魯,我不知道你能做到這一點!我會馬上看看concurrentExecutor – Ravi 2010-04-22 12:43:12

+0

__Careful !!! __雖然在產生的線程中執行阻塞操作總是完全可以的(然後它會阻塞一段時間),但這樣做可能是致命的在傳遞給'java.util.concurrent.Executor'的'Runnable'實例中。爲什麼?因爲Executor不保證在另一個線程上運行代碼。它也可以在調用線程上運行代碼。從文檔:「但是,Executor接口不嚴格要求執行異步。」。所以你也可以用這種方式阻止你的主線程,並且你可以很容易地鎖定整個程序。 – Mecki 2015-03-06 18:34:25

+0

好的呼籲實施重要。一個同步impl的例子。將[Spring's SyncTaskExecutor](http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/core/task/SyncTaskExecutor.html) – Drew 2015-03-09 14:26:59

9

的第一件事是先:你的類應按照Java Naming Conventions大寫字母開頭:

類名應該是名詞,採用大小寫混合的 的第一個字母,每個單詞大寫。嘗試 保持您的類名簡單, 描述性。使用整個單詞 - 避免使用 首字母縮寫詞和縮寫(除非 的縮寫比使用較長的格式(比如URL或 HTML)使用得更廣泛的 )。

二: 嘗試打破碼成連貫的部分,並組織他們周圍,你處理一些共同的特點......也許周圍的功能或你的編程模型。

服務器的(基本)模型是它只有它接收套接字連接...服務器依賴於處理程序來處理這些連接,就是這樣。如果試圖建立一個模型,它會是這個樣子:

class Server{ 
    private final ServerSocket serverSocket; 
    private final ExecutorService pool; 

    public Server(int port, int poolSize) throws IOException { 
     serverSocket = new ServerSocket(port); 
     pool = Executors.newFixedThreadPool(poolSize); 
    } 

    public void serve() { 
     try { 
     while(true) { 
      pool.execute(new Handler(serverSocket.accept())); 
     } 
     } catch (IOException ex) { 
     pool.shutdown(); 
     } 
    } 
    } 

    class Handler implements Runnable { 
    private final Socket socket; 
    Handler(Socket socket) { this.socket = socket; } 
    public void run() { 
     // receive the datagram packets 
    } 
} 

第三:我建議你看一些現有的例子。

每評論更新時間:
OK拉維,還有一些問題與您的代碼和一些未成年人問題:

  1. 我認爲Receive類是你的客戶......你應該把它作爲一個單獨的程序(帶有它自己的主類)並同時運行你的服務器和多個客戶端。從你的服務器產生一個新的「客戶端線程」,你發送的每一個新的UDP包都是一個令人不安的想法(big issue)。

  2. 當你讓你的客戶端應用程序,你應該讓運行接收的代碼在其自己的while環路(輕微問題),例如:

    public class Client extends Thread 
    { 
        public Client(/*..*/) 
        { 
         // initialize your client 
        } 
    
        public void run() 
        { 
         while(true) 
         { 
          // receive UDP packets 
          // process the UDP packets 
         } 
        } 
    
        public static void main(String[] args) throws IOException 
        { 
         // start your client 
         new Client().start(); 
        } 
    } 
    
  3. 你應該只需要每一個線程客戶端和每個服務器一個線程(因爲main有自己的線程,所以在技術上甚至沒有單獨的線程),所以你可能找不到有用的ExecutorService

否則,你的做法是正確的......但我仍然建議你看看一些例子。

+3

http://www.developer.com/ java/ent/article.php/3645111/Java-5s-BlockingQueue.htm - Doug Lea的'簡單服務器' – 2010-04-22 03:01:28

+0

@John啊,是的......感謝John,那就是我一直在尋找的東西。 – Kiril 2010-04-22 03:19:47

+0

+1好答案!!!!! – 2010-04-22 03:29:01

0

2個線程很好。一位讀者另一位作家。記住,使用UDP你不應該產生新的處理線程(除非你所做的需要很長時間),我建議把傳入的消息放入處理隊列中。對於發送來說也是一樣的,有一個發送線程可以阻塞UDP傳入隊列。

1

這是一件很好的事情,即使是在一天之後,Eclipse的歷史也能運行:)感謝這一點,我能夠給這兩個Ravi一個工作示例,並且Lirik就他的泄漏問題做出回答。

讓我先說一說,我不知道是什麼原因導致這種泄漏,但是如果我把它留下足夠長的時間,它將會在OutOfMemoryError上失敗。

其次,我將工作代碼留給Ravi作爲我的UDP服務器的工作基本示例。超時在那裏測試我的防火牆將終止接收器的時間(30秒)。只要移除池中的任何東西,你就可以走了。

因此,這裏是我的示例線程UDP服務器的工作,但泄漏版本。

public class TestServer { 

private static Integer TIMEOUT = 30; 
private final static int MAX_BUFFER_SIZE = 8192; 
private final static int MAX_LISTENER_THREADS = 5; 
private final static SimpleDateFormat DateFormat = new SimpleDateFormat("yyyy-dd-MM HH:mm:ss.SSSZ"); 

private int mPort; 
private DatagramSocket mSocket; 

// You can remove this for a working version 
private ExecutorService mPool; 

public TestServer(int port) { 
    mPort = port; 
    try { 
     mSocket = new DatagramSocket(mPort); 
     mSocket.setReceiveBufferSize(MAX_BUFFER_SIZE); 
     mSocket.setSendBufferSize(MAX_BUFFER_SIZE); 
     mSocket.setSoTimeout(0); 

     // You can uncomment this for a working version 
     //for (int i = 0; i < MAX_LISTENER_THREADS; i++) { 
     // new Thread(new Listener(mSocket)).start(); 
     //} 

     // You can remove this for a working version 
     mPool = Executors.newFixedThreadPool(MAX_LISTENER_THREADS); 

    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

// You can remove this for a working version 
public void start() { 
    try { 
     try { 
      while (true) { 
       mPool.execute(new Listener(mSocket)); 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } finally { 
     mPool.shutdown(); 
    } 
} 

private class Listener implements Runnable { 

    private final DatagramSocket socket; 

    public Listener(DatagramSocket serverSocket) { 
     socket = serverSocket; 
    } 

    private String readLn(DatagramPacket packet) throws IOException { 
     socket.receive(packet); 
     return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(packet.getData())), MAX_BUFFER_SIZE).readLine(); 
    } 

    private void writeLn(DatagramPacket packet, String string) throws IOException { 
     packet.setData(string.concat("\r\n").getBytes()); 
     socket.send(packet); 
    } 

    @Override 
    public void run() { 
     DatagramPacket packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE); 
     String s; 
     while (true) { 
      try { 
       packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE); 
       s = readLn(packet); 
       System.out.println(DateFormat.format(new Date()) + " Received: " + s); 
       Thread.sleep(TIMEOUT * 1000); 
       writeLn(packet, s); 
       System.out.println(DateFormat.format(new Date()) + " Sent: " + s); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

public static void main(String[] args) { 
    if (args.length == 1) { 
     try { 
      TIMEOUT = Integer.parseInt(args[0]); 
     } catch (Exception e) { 
      TIMEOUT = 30; 
     } 
    } 
    System.out.println(DateFormat.format(new Date()) + " Timeout: " + TIMEOUT); 
    //new TestServer(4444); 
    new TestServer(4444).start(); 
} 
} 

btw。 @Lirik,我在Eclipse中首先目睹了這種行爲,之後我從命令行對它進行了測試。再次,我不知道是什麼導致它)對不起...