2013-03-21 66 views
2

我是新作家,請耐心等待。WCF雙工內的TPL數據流塊

我有一個雙工服務合同的WCF服務。這個服務合同有一個操作聯繫人,假設要進行長時間的數據處理。我受限制的併發數據處理的數量,我們可以說最大3.我的問題是,數據處理後,我需要回到相同的服務實例上下文,所以我回叫我的發起端點傳遞數據處理結果。我需要提到的是,由於各種原因,我受限於TPL數據流和WCF雙工。

這裏是一個演示,到目前爲止

我寫在一個控制檯庫我模擬WCF調用

class Program 
{ 
    static void Main(string[] args) 
    { 
     // simulate service calls 

     Enumerable.Range(0, 5).ToList().ForEach(x => 
     { 
      new System.Threading.Thread(new ThreadStart(async() => 
      { 
       var service = new Service(); 
       await service.Inc(x); 
      })).Start(); 
     }); 
    } 
} 

這裏是假設是WCF服務

// service contract 
public class Service 
{ 
    static TransformBlock<Message<int>, Message<int>> transformBlock; 

    static Service() 
    { 
     transformBlock = new TransformBlock<Message<int>, Message<int>>(x => Inc(x), new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = 3 
     }); 
    } 

    static Message<int> Inc(Message<int> input) 
    { 
     System.Threading.Thread.Sleep(100); 

     return new Message<int> { Token = input.Token, Data = input.Data + 1 }; 
    } 

    // operation contract 
    public async Task Inc(int id) 
    { 
     var token = Guid.NewGuid().ToString(); 

     transformBlock.Post(new Message<int> { Token = token, Data = id }); 

     while (await transformBlock.OutputAvailableAsync()) 
     { 
      Message<int> message; 
      if (transformBlock.TryReceive(m => m.Token == token, out message)) 
      { 
       // do further processing using initiator service instance members 
       // something like Callback.IncResult(m.Data); 
       break; 
      } 
     } 
    } 
} 

public class Message<T> 
{ 
    public string Token { get; set; } 

    public T Data { get; set; } 
} 

的操作契約並不是真的必須是異步的,但我需要OutputAvailableAsync通知。

這是一個好方法,還是有更好的解決方案,我的方案?

在此先感謝。

+0

「我不得不TPL數據流」你是什麼意思?你必須使用數據流?爲什麼?這沒有什麼意義。 – svick 2013-03-21 20:39:51

+0

我對數據處理的要求是確保併發性,但限制並行性。 TPL數據流塊看起來是一個不錯的選擇,再加上它們是由技術要求而不是PLinq或其他東西來強加的。 – uni3324 2013-03-21 21:02:15

+0

如果這真的是所有你會使用的數據流,那麼我認爲這是一個矯枉過正。你可以用簡單的代碼達到相同的效果(請參閱我的答案)。此外,我不確定雙工服務是否是正確的選擇。即使沒有,客戶端和服務器也可以是異步的。 – svick 2013-03-21 21:09:07

回答

1

首先,我認爲你不應該像你那樣使用令牌。進程間通信時唯一標識符很有用。但是當你在一個進程中時,只需使用引用平等。

要真正回答你的問題,我認爲(種)繁忙循環不是一個好主意。

一個簡單的異步調節解決方案是使用SemaphoreSlim。喜歡的東西:

static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(3); 

// operation contract 
public async Task Inc(int id) 
{ 
    await Semaphore.WaitAsync(); 

    try 
    { 
     Thread.Sleep(100); 
     var result = id + 1; 
     // do further processing using initiator service instance members 
     // something like Callback.IncResult(result); 
    } 
    finally 
    { 
     Semaphore.Release(); 
    } 
} 

如果你真的想(?或必須)使用數據流,你可以使用TaskCompletionSource運行和塊之間的同步。操作方法將等待對TaskCompletionSourceTask和塊將設置它,當它完成計算該消息:

private static readonly ActionBlock<Message<int>> Block = 
    new ActionBlock<Message<int>>(
     x => Inc(x), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = 3 
     }); 

static void Inc(Message<int> input) 
{ 
    Thread.Sleep(100); 

    input.TCS.SetResult(input.Data + 1); 
} 

// operation contract 
public async Task Inc(int id) 
{ 
    var tcs = new TaskCompletionSource<int>(); 

    Block.Post(new Message<int> { TCS = tcs, Data = id }); 

    int result = await tcs.Task; 
    // do further processing using initiator service instance members 
    // something like Callback.IncResult(result); 
}