2012-02-15 60 views
2

Rrom C#,通過SOCKET讀/寫到JAVA併產生一些併發/套接字問題。C#通過讀寫SOCKET到JAVA併發生一些併發/套接字問題

我想實現一個服務器客戶端應用程序,其中服務器是Java和客戶端是C#。他們通過TCP/IP進行通信並在它們之間交換一些二進制數據。

特別是我有一個在Java和C#中定義的數據包類。它有一個標題,鍵和值。 Java和C#都以完全相同的方式寫入和讀取數據包到套接字。通過這種方式,我可以從C#發送請求數據包,在Java服務器上處理它並將響應作爲數據包發回。

原來的問題是更復雜的方式,但我可以把它煮到這個「簡單」的版本。

我已經實現了服務器和客戶端,如下所述。代碼也可在底部找到。

對我說出你不得不繼續閱讀:)

服務器(Java)的側

在服務器端的問題,我有一個很虛的ServerSocket使用。它讀取傳入的數據包併發回與響應幾乎相同的數據包。

客戶端(C#)端 客戶端有點複雜。客戶端啓動N(可配置)線程數(我將稱它們爲用戶線程)。一個在線程和一個在線程。所有用戶線程都使用虛擬請求數據包和唯一ID創建一個Call對象。然後將該呼叫添加到本地BlockingCollection中。

的輸出線程連續讀取本地BlockingCollection和服務器也連續發送所有請求數據包到服務器

將在線程讀取響應數據包,並將其匹配到呼叫對象(還記得唯一的呼叫ID)。

如果在5秒的時間間隔內沒有響應特定的Call對象,用戶線程將通過打印到控制檯來投訴它。

還有一個定時器,間隔10秒,打印每秒執行多少事務。

如果您到達目前爲止,謝謝:)。

現在的問題:

下面的代碼,這是一個什麼樣上述我在Mac上使用Mono正常工作的落實。在Windows上,它也不會立即以較低的用戶線程數(< 10)失敗。隨着我突然增加線程數量,客戶端收到的響應數據包以某種方式被破壞。在這個應用程序方面,所有用戶線程都被阻塞,因爲他們的請求的答案沒有收到。 問題是爲什麼他們被損壞?正如你看到的touche socket的線程是In和Out線程。但不知何故,用戶線程的數量會影響客戶端並使其剎車。

它看起來像一些併發或套接字問題,但我可以找到它。

我已經把服務器(Java)和客戶端(C#)的代碼。他們沒有任何依賴關係,只是編譯和運行兩個主要方法(第一臺服務器)顯示問題。

我很欣賞你讀到目前爲止。

服務器代碼

import java.io.*; 
import java.net.*; 
import java.nio.ByteBuffer; 

public class DummyServer { 

public static void main(String[] args) throws IOException { 
    ServerSocket server = new ServerSocket(9900); 
    System.out.println("Server started"); 
    for(;;){ 
     final Socket socket = server.accept(); 
     System.out.println("Accepting a connection"); 
     new Thread(new Runnable(){ 
      public void run() { 
       try { 
        System.out.println("Thread started to handle the connection"); 
        DataInputStream dis = new DataInputStream(socket.getInputStream()); 
        DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); 
        for(int i=0; ; i++){ 
         Packet packet = new Packet(); 
         packet.readFrom(dis); 
         packet.key = null; 
         packet.value = new byte[1000]; 
         packet.writeTo(dos); 
        } 
       } catch (IOException e) { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 
    } 
} 
public static class Packet { 
    byte[] key; 
    byte[] value; 
    long callId = -1; 
    private int valueHash = -1; 

    public void writeTo(DataOutputStream outputStream) throws IOException { 
     final ByteBuffer writeHeaderBuffer = ByteBuffer.allocate(1 << 10); // 1k 
     writeHeaderBuffer.clear(); 
     writeHeaderBuffer.position(12); 
     writeHeaderBuffer.putLong(callId); 
     writeHeaderBuffer.putInt(valueHash); 
     int size = writeHeaderBuffer.position(); 
     int headerSize = size - 12; 
     writeHeaderBuffer.position(0); 
     writeHeaderBuffer.putInt(headerSize); 
     writeHeaderBuffer.putInt((key == null) ? 0 : key.length); 
     writeHeaderBuffer.putInt((value == null) ? 0 : value.length); 
     outputStream.write(writeHeaderBuffer.array(), 0, size); 
     if (key != null)outputStream.write(key); 
     if (value != null)outputStream.write(value); 
    } 

    public void readFrom(DataInputStream dis) throws IOException { 
     final ByteBuffer readHeaderBuffer = ByteBuffer.allocate(1 << 10); 
     final int headerSize = dis.readInt(); 
     int keySize = dis.readInt(); 
     int valueSize = dis.readInt(); 
     readHeaderBuffer.clear(); 
     readHeaderBuffer.limit(headerSize); 
     dis.readFully(readHeaderBuffer.array(), 0, headerSize); 
     this.callId = readHeaderBuffer.getLong(); 
     valueHash = readHeaderBuffer.getInt(); 
     key = new byte[keySize]; 
     dis.readFully(key); 
     value = new byte[valueSize]; 
     dis.readFully(value); 
    } 
} 

}

C#的客戶端代碼:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Net.Sockets; 
using System.Net; 
using System.IO; 
using System.Collections.Concurrent; 
using System.Threading; 

namespace Client 
{ 
public class Program 
{ 
    readonly ConcurrentDictionary<long, Call> calls = new ConcurrentDictionary<long, Call>(); 
    readonly BlockingCollection<Call> outThreadQueue = new BlockingCollection<Call>(1000); 
    readonly TcpClient tcpClient = new TcpClient("localhost", 9900); 
    readonly private int THREAD_COUNT; 
    static int ops; 

    public static void Main(string[] args) { 
     new Program(args.Length > 0 ? int.Parse(args[0]) : 100).Start(); 
    } 
    public Program(int threadCount) { 
     this.THREAD_COUNT = threadCount; 
     new Thread(new ThreadStart(this.InThreadRun)).Start();//start the InThread 
     new Thread(new ThreadStart(this.OutThreadRun)).Start();//start the OutThread 
    } 
    public void Start(){ 
     for (int i = 0; i < THREAD_COUNT; i++) 
      new Thread(new ThreadStart(this.Call)).Start(); 
     Console.WriteLine(THREAD_COUNT + " User Threads started to perform server call"); 
     System.Timers.Timer aTimer = new System.Timers.Timer(10000); 
     aTimer.Elapsed += new System.Timers.ElapsedEventHandler(this.Stats); 
     aTimer.Enabled = true; 
    } 
    public void Stats(object source, System.Timers.ElapsedEventArgs e){ 
     Console.WriteLine("Ops per second: " + Interlocked.Exchange(ref ops, 0)/10); 
    } 
    public void Call() { 
     for (; ;){ 
      Call call = new Call(new Packet()); 
      call.request.key = new byte[10]; 
      call.request.value = new byte[1000]; 
      outThreadQueue.Add(call); 
      Packet result = null; 
      for (int i = 1;result==null ; i++){ 
       result = call.getResult(5000); 
       if(result==null) Console.WriteLine("Call" + call.id + " didn't get answer within "+ 5000*i/1000 + " seconds"); 
      } 
      Interlocked.Increment(ref ops); 
     } 
    } 
    public void InThreadRun(){ 
     for (; ;){ 
      Packet packet = new Packet(); 
      packet.Read(tcpClient.GetStream()); 
      Call call; 
      if (calls.TryGetValue(packet.callId, out call)) 
       call.inbQ.Add(packet); 
      else 
       Console.WriteLine("Unkown call result: " + packet.callId); 
     } 
    } 
    public void OutThreadRun() { 
     for (; ;){ 
      Call call = outThreadQueue.Take(); 
      calls.TryAdd(call.id, call); 
      Packet packet = call.request; 
      if (packet != null) packet.write(tcpClient.GetStream()); 
     } 
    } 
} 
public class Call 
{ 
    readonly public long id; 
    readonly public Packet request; 
    static long callIdGen = 0; 
    readonly public BlockingCollection<Packet> inbQ = new BlockingCollection<Packet>(1); 
    public Call(Packet request) 
    { 
     this.id = incrementCallId(); 
     this.request = request; 
     this.request.callId = id; 
    } 
    public Packet getResult(int timeout) 
    { 
     Packet response = null; 
     inbQ.TryTake(out response, timeout); 
     return response; 
    } 
    private static long incrementCallId() 
    { 
     long initialValue, computedValue; 
     do 
     { 
      initialValue = callIdGen; 
      computedValue = initialValue + 1; 
     } while (initialValue != Interlocked.CompareExchange(ref callIdGen, computedValue, initialValue)); 
     return computedValue; 
    } 
} 

public class Packet 
{ 
    public byte[] key; 
    public byte[] value; 
    public long callId = 0; 
    public void write(Stream stream) 
    { 
     MemoryStream header = new MemoryStream(); 
     using (BinaryWriter writer = new BinaryWriter(header)) 
     { 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder((long)callId)); 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder((int)-1)); 
     } 
     byte[] headerInBytes = header.ToArray(); 
     MemoryStream body = new MemoryStream(); 
     using (BinaryWriter writer = new BinaryWriter(body)) 
     { 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder(headerInBytes.Length)); 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder(key == null ? 0 : key.Length)); 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder(value == null ? 0 : value.Length)); 
      writer.Write(headerInBytes); 
      if (key != null) writer.Write(key); 
      if (value != null) writer.Write(value); 
      byte[] packetInBytes = body.ToArray(); 
      stream.Write(packetInBytes, 0, packetInBytes.Length); 
     } 
    } 
    public void Read(Stream stream) 
    { 
     BinaryReader reader = new BinaryReader(stream); 
     int headerSize = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     int keySize = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     int valueSize = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     this.callId = IPAddress.NetworkToHostOrder(reader.ReadInt64()); 
     int valuePartitionHash = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     this.key = new byte[keySize]; 
     this.value = new byte[valueSize]; 
     if (keySize > 0) reader.Read(this.key, 0, keySize); 
     if (valueSize > 0) reader.Read(this.value, 0, valueSize); 
    } 
} 

}

回答

2

這是一個非常常見的錯誤:任何Read調用套接字可能實際上不會讀取儘可能多的字節數,如果它們當前不可用。 Read將返回每個調用讀取的字節數。如果您希望讀取n個字節的數據,則需要多次調用讀取,直到讀取的字節數加起來爲n。

+0

你是對的,我改變了Read()方法到我從Java DataInputStream中作弊的readFully方法,它工作。非常感謝。高度讚賞! – 2012-02-15 23:45:32