2017-06-04 121 views
1

我有我需要處理的項目編號列表。一個項目可能有大約8000個項目,我需要獲取項目中每個項目的數據,然後將這些數據推送到服務器列表中。任何人都可以告訴我以下...BroadcastBlock缺失項目

1)我有1000個項目在iR但只有998被寫入服務器。通過使用broadCastBlock,我的物品是否鬆動? 2)我是否正確地等待所有actionBlocks? 3)如何使數據庫調用異步?

這裏是數據庫代碼

public MemcachedDTO GetIR(MemcachedDTO dtoItem) 
    { 

     string[] Tables = new string[] { "iowa", "la" }; 
     using (SqlConnection connection = new SqlConnection(ConfigurationManager.ConnectionStrings["test"].ConnectionString)) 
     { 
      using (SqlCommand command = new SqlCommand("test", connection)) 
      { 
       DataSet Result = new DataSet(); 
       command.CommandType = CommandType.StoredProcedure; 

       command.Parameters.Add("@ProjectId", SqlDbType.VarChar); 
       command.Parameters["@ProjectId"].Value = dtoItem.ProjectId; 


       connection.Open(); 
       Result.EnforceConstraints = false; 
       Result.Load(command.ExecuteReader(CommandBehavior.CloseConnection), LoadOption.OverwriteChanges, Tables); 
       dtoItem.test = Result; 
      } 
     } 
     return dtoItem; 
    } 

更新: 我的代碼更新到下面。它只是在我運行它時掛起,只寫入1/4的數據到服務器?你能讓我知道我做錯了什麼嗎?

 public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options) 
    { 
     var targetsList = targets.ToList(); 

     var block = new ActionBlock<T>(
      async item => 
      { 
       foreach (var target in targetsList) 
       { 
        await target.SendAsync(item); 
       } 
      }, new ExecutionDataflowBlockOptions 
      { 
       CancellationToken = options.CancellationToken 
      }); 

     block.Completion.ContinueWith(task => 
     { 
      foreach (var target in targetsList) 
      { 
       if (task.Exception != null) 
        target.Fault(task.Exception); 
       else 
        target.Complete(); 
      } 
     }); 

     return block; 
    } 

    [HttpGet] 
    public async Task< HttpResponseMessage> ReloadItem(string projectQuery) 
    { 
     try 
     { 

      var linkCompletion = new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 2 
      }; 
      var cts = new CancellationTokenSource(); 
      var dbOptions = new DataflowBlockOptions { CancellationToken = cts.Token }; 


      IList<string> projectIds = projectQuery.Split(',').ToList(); 
      IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>(); 

      var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
       dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }); 

      List<ActionBlock<MemcachedDTO>> actionList = new List<ActionBlock<MemcachedDTO>>(); 


      List<MemcachedDTO> dtoList = new List<MemcachedDTO>(); 

      foreach (string pid in projectIds) 
      { 
       IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>(); 
       dtoTemp = MemcachedDTO.GetItemIdsByProject(pid); 
       dtoList.AddRange(dtoTemp); 
      } 
      foreach (string s in serverList) 
      { 
       var action = new ActionBlock<MemcachedDTO>(
       async dto => await PostEachServerAsync(dto, s, "setitemcache")); 
       actionList.Add(action); 
      } 
      var bBlock = CreateGuaranteedBroadcastBlock(actionList, dbOptions); 

      foreach (MemcachedDTO d in dtoList) 
      { 
       await iR.SendAsync(d); 
      } 

      iR.Complete(); 
      iR.LinkTo(bBlock); 
      await Task.WhenAll(actionList.Select(action => action.Completion).ToList()); 

      return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" }); 
     } 
     catch (Exception ex) 
     { 
      return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() }); 
     } 
    } 

回答

0

1)我在IR 1000個項目,但只有998被寫入到服務器。通過使用broadCastBlock,我的物品是否鬆動?

是的,在下面的代碼中您將BoundedCapacity設置爲1,如果在任何時候您的BroadcastBlock無法傳遞一件物品,它將丟棄它。此外BroadcastBlock只會傳播Completion到一個TargetBlock,在這裏不要使用PropagateCompletion=true。如果您想要完成所有塊,則需要手動處理Completion。這可以通過設置BroadcastBlock上的ContinueWithCompletion傳遞給所有連接的目標來完成。

var action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, s, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = 1 }); 
broadcast.LinkTo(action, linkCompletion); 
actionList.Add(action); 

選項:取而代之的是BroadcastBlock的使用正確界定BufferBlock。當您的下游塊綁定到一個項目時,它們將無法接收其他項目,直到它們完成處理。這將允許BufferBlock將其物品提供給另一個可能空閒的ActionBlock

當您將項目添加到節流流程中時,即流程BoundedCapacity小於Unbounded。您需要使用SendAsync方法或至少處理Post的退貨。我建議你只需使用SendAsync

foreach (MemcachedDTO d in dtoList) 
{ 
    await iR.SendAsync(d); 
} 

這將迫使你的方法簽名成爲:

public async Task<HttpResponseMessage> ReloadItem(string projectQuery) 

2)我是否正確做等待有關的所有actionBlocks?

以前的變化將使你失去阻塞Wait呼叫贊成的await Task.WhenAlll

iR.Complete(); 
actionList.ForEach(x => x.Completion.Wait()); 

To: 

iR.Complete(); 
await bufferBlock.Completion.ContinueWith(tsk => actionList.ForEach(x => x.Complete()); 
await Task.WhenAll(actionList.Select(action => action.Completion).ToList()); 

3)如何使數據庫調用異步?

我要離開這個打開,因爲它應該是無關的TPL-Dataflow一個單獨的問題,但在短期使用async API來訪問你的數據庫和async將通過您的代碼庫自然生長。 This should get you started

BufferBlock VS BroadcastBlock

重新閱讀您的previous question和答案來自@VMAtm後。看來你想每個項目發送到全部五臺服務器,在這種情況下,你將需要一個BroadcastBlock。您可以使用BufferBlock將消息相對均勻地分發到各自可處理消息的靈活服務器池。無論如何,您仍然需要通過等待BroadcastBlock的完成來控制傳播完成和故障到所有連接的ActionBlocks

以防止BroadcastBlock丟棄的消息

一般來說,你兩個選項,設置您的ActionBlocks相綁定,這是他們的默認值:

new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = Unbounded }); 

或者廣播消息你的自我從任何各種各樣的你自己的建築。 @ i3arnon的Here is an example implementationAnd another來自@svick

+0

感謝您的詳細回覆@JSteward。我已經在上面的帖子中更新了我的代碼,並且仍然有問題...應用程序沒有完成(我刪除了PropagateCompletion),只寫了1/4的記錄。你能否指點我正確的方向?謝謝 – klkj898

+0

跳出來的第一件事是:在將所有數據發送到流中之前,您沒有將「TransformBlock」鏈接到「BroadcastBlock」。當你修復那部分會發生什麼? – JSteward

+0

另一種可能性是'PostEachServerAsync'未被等待,如果它正在運行'async',那麼它將被視爲火併遺忘,並且可能無法在流程完成時完成。 – JSteward