2016-12-05 97 views
1

我試圖將10000條記錄插入到Azure表存儲中。我正在使用ExecuteAsync()來實現它,但不知怎的,插入了約7500條記錄,其餘記錄都丟失了。我故意不使用等待關鍵字,因爲我不想等待結果,只是想將它們存儲在表中。以下是我的代碼片段。Azure表存儲的ExecuteAsync()存儲無法插入所有記錄

private static async void ConfigureAzureStorageTable() 
    { 
     CloudStorageAccount storageAccount = 
      CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString")); 
     CloudTableClient tableClient = storageAccount.CreateCloudTableClient(); 
     TableResult result = new TableResult(); 
     CloudTable table = tableClient.GetTableReference("test"); 
     table.CreateIfNotExists(); 

     for (int i = 0; i < 10000; i++) 
     { 
      var verifyVariableEntityObject = new VerifyVariableEntity() 
      { 
       ConsumerId = String.Format("{0}", i), 
       Score = String.Format("{0}", i * 2 + 2), 
       PartitionKey = String.Format("{0}", i), 
       RowKey = String.Format("{0}", i * 2 + 2) 
      }; 
      TableOperation insertOperation = TableOperation.Insert(verifyVariableEntityObject); 
      try 
      { 
       table.ExecuteAsync(insertOperation); 
      } 
      catch (Exception e) 
      { 

       Console.WriteLine(e.Message); 
      } 
     } 
    } 

該方法的用法有什麼不正確?

+1

如果您不等它完成,它可能無法完成(特別是如果您的過程退出)。而且你不會發現任何錯誤。 – SLaks

+0

如果你不等它完成,它可能不會完成! – DavidG

+0

你解決了這個問題,有沒有更新?您可以在控制檯應用程序中捕捉到詳細的異常,並在將記錄插入到Azure表存儲器時通過Fiddler捕獲網絡包。 –

回答

3

仍然想要await table.ExecuteAsync()。這將意味着ConfigureAzureStorageTable()在那一點返回控制給調用者,它可以繼續執行。

你有它的問題的方式,ConfigureAzureStorageTable()將會繼續通過調用table.ExecuteAsync()和退出,事情就是這樣table會走出去的範圍,而table.ExecuteAsync()任務仍然沒有完成。

在SO和其他地方使用async void有很多警告,您還需要考慮。你可以很容易有你的方法async Task但在主叫不等待,但讓它返回Task周圍乾淨終止等

編輯:一個另外 - 你幾乎可以肯定要使用ConfigureAwait(false)在你的await那裏,因爲你似乎不需要保存任何上下文。這個blog post有一些關於這個和異步的指導方針。

1

如何使用TableBatchOperation一次運行批次的N個刀片?

private const int BatchSize = 100; 

private static async void ConfigureAzureStorageTable() 
{ 
    CloudStorageAccount storageAccount = 
     CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString")); 
    CloudTableClient tableClient = storageAccount.CreateCloudTableClient(); 
    TableResult result = new TableResult(); 
    CloudTable table = tableClient.GetTableReference("test"); 
    table.CreateIfNotExists(); 

    var batchOperation = new TableBatchOperation(); 

    for (int i = 0; i < 10000; i++) 
    { 
     var verifyVariableEntityObject = new VerifyVariableEntity() 
     { 
      ConsumerId = String.Format("{0}", i), 
      Score = String.Format("{0}", i * 2 + 2), 
      PartitionKey = String.Format("{0}", i), 
      RowKey = String.Format("{0}", i * 2 + 2) 
     }; 
     TableOperation insertOperation = TableOperation.Insert(verifyVariableEntityObject); 
     batchOperation.Add(insertOperation); 

     if (batchOperation.Count >= BatchSize) 
     { 
      try 
      { 
       await table.ExecuteBatchAsync(batchOperation); 
       batchOperation = new TableBatchOperation(); 
      } 
      catch (Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 
    } 

    if(batchOperation.Count > 0) 
    { 
     try 
     { 
      await table.ExecuteBatchAsync(batchOperation); 
     } 
     catch (Exception e) 
     { 
      Console.WriteLine(e.Message); 
     } 
    } 
} 

您可以根據需要調整BatchSize。小聲免責聲明:我沒有試圖運行這個,儘管它應該可以工作。

但我不禁想知道爲什麼你的功能是async void?這應該保留給事件處理程序和類似的,你不能決定接口。在大多數情況下,你想返回一個任務。因爲現在調用者無法捕獲在此函數中發生的異常。

1

根據您的要求,我已經成功使用CloudTable.ExecuteAsyncCloudTable.ExecuteBatchAsync測試了您的情況。以下是我使用CloudTable.ExecuteBatchAsync將記錄插入到Azure表存儲的代碼片段,您可以參考它。

程序。CS主要

class Program 
{ 
    static void Main(string[] args) 
    { 
     CloudStorageAccount storageAccount = 
      CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString")); 
     CloudTableClient tableClient = storageAccount.CreateCloudTableClient(); 
     TableResult result = new TableResult(); 
     CloudTable table = tableClient.GetTableReference("test"); 
     table.CreateIfNotExists(); 

     //Generate records to be inserted into Azure Table Storage 
     var entities = Enumerable.Range(1, 10000).Select(i => new VerifyVariableEntity() 
     { 
      ConsumerId = String.Format("{0}", i), 
      Score = String.Format("{0}", i * 2 + 2), 
      PartitionKey = String.Format("{0}", i), 
      RowKey = String.Format("{0}", i * 2 + 2) 
     }); 

     //Group records by PartitionKey and prepare for executing batch operations 
     var batches = TableBatchHelper<VerifyVariableEntity>.GetBatches(entities); 

     //Execute batch operations in parallel 
     Parallel.ForEach(batches, new ParallelOptions() 
     { 
      MaxDegreeOfParallelism = 5 
     }, (batchOperation) => 
     { 
      try 
      { 
       table.ExecuteBatch(batchOperation); 
       Console.WriteLine("Writing {0} records", batchOperation.Count); 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine("ExecuteBatch throw a exception:" + ex.Message); 
      } 
     }); 
     Console.WriteLine("Done!"); 
     Console.WriteLine("Press any key to exit..."); 
     Console.ReadKey(); 
    } 
} 

TableBatchHelper.cs

public class TableBatchHelper<T> where T : ITableEntity 
{ 
    const int batchMaxSize = 100; 

    public static IEnumerable<TableBatchOperation> GetBatches(IEnumerable<T> items) 
    { 
     var list = new List<TableBatchOperation>(); 
     var partitionGroups = items.GroupBy(arg => arg.PartitionKey).ToArray(); 
     foreach (var group in partitionGroups) 
     { 
      T[] groupList = group.ToArray(); 
      int offSet = batchMaxSize; 
      T[] entities = groupList.Take(offSet).ToArray(); 
      while (entities.Any()) 
      { 
       var tableBatchOperation = new TableBatchOperation(); 
       foreach (var entity in entities) 
       { 
        tableBatchOperation.Add(TableOperation.InsertOrReplace(entity)); 
       } 
       list.Add(tableBatchOperation); 
       entities = groupList.Skip(offSet).Take(batchMaxSize).ToArray(); 
       offSet += batchMaxSize; 
      } 
     } 
     return list; 
    } 
} 

注:作爲官方document提到有關將一批實體:

一個批處理操作可包括高達 enti領帶。

單個批處理操作中的所有實體必須具有相同的分區鍵

總之,請嘗試檢查它是否可以在您身邊工作。此外,您可以捕獲控制檯應用程序中的詳細異常,並通過Fiddler捕獲HTTP請求,以在將記錄插入到Azure表存儲器時捕獲HTTP錯誤請求。