2017-10-12 107 views
0

我正在使用Azure事件中心。我打算髮送活動並使用SendBatchAsync。我看到事件中心有256KB的限制(無論是單獨或批量發送)將大於256KB的事件推送到Azure EventHub

所以,如果我的數據> 256KB,什麼是最佳實踐來處理呢?我們是否應該單獨發送消息(這將保證它是< 256KB)?

另外,如何將事件分成256KB塊併發送到Event Hub? 我看了看Azure文檔,看到他們推薦使用EventHubClient.CreateBatch但我沒有看到足夠的示例。有人可以在我們將如何最終分裂爲256KB和SendBatchAsync

在這裏需要一些樣機或樣品或步驟提供是我做的(但不因子256KB限制)

await myEventHubClient.SendBatchAsync(
    events.Select( 
     iEvent => 
      new EventData(
       Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(iEvent))))); 
+0

也許它可以直接與事件樞紐來完成,別人也許可以回答這個問題,但解決這一問題的傳統方法是分別,只是存儲數據在活動中發送一個ID。在azure中,您可以將您的數據存儲在存儲桶中,並只需通過id將其下載到處理事件的地方。 – alun

+1

您的方案中單條消息有多大?由於你的帖子中存在矛盾,我有點困惑。你說單個消息保證小於256 KB,但接下來你將討論以256 KB爲單位分割單個消息。因此我的問題。 –

+0

@Peter Bons - 我從Blob/CSV文件讀取。我正在寫一行這個blob/csv到我們的存儲器中並推送給EH。這個blob大小可能是8KB,也可能是256KB甚至500KB。我希望我的設計能夠考慮這個限制。我現在正在做的是,我將SendBatchAsync這些行用於EH。如果我單獨執行(每行讀取)並使用SendAsync,則不會面臨此限制,因爲1行CSV將小於256KB。希望澄清。 – khar

回答

1

不要輕易嘗試並且將多個消息中的單個有效負載分塊,考慮到事件不能保證按順序到達,可能最終在不同的分區中並且可能到達一定的時間間隔,並且可以獨立於「總線」。這在規模上變得更具挑戰性。

選項1:如果您的有效負載是text/json,並且您的消息大小不超過256kb,請考慮壓縮有效負載。您可以從this online tool獲得尺寸結果的概念。如果您的有效負載中有大量空白(例如JSON),則也可以使用minify而不是壓縮。

選項2:寧可將您的有效內容存儲在外部存儲中(比如DocumentDB或Blob存儲)。

在事件中發送對您的有效負載的引用。這使您的事件保持精簡,但您的消費者需要了解如何檢索有效負載。一種簡單的方法是將有效負載的鏈接作爲URI呈現,顯然,您可能需要考慮有效負載存儲上的身份驗證和授權,以符合您的Event Hub訪問策略,以保持其光滑。

+0

謝謝@ Murray-Foxcroft。我喜歡你列出的2個選項。但是,我想考慮使用類似EventHubClient.CreateBatch的Azure團隊提供的選項。因此,我需要更多關於如何使用它的例子/信息。 – khar

+0

批處理是將消息捆綁在一起以達到性能或時間原因,而不是創建一組相關消息。每條消息都應該獨立存在。讓我知道你是怎麼辦的。 –

0

下面是如何使用發送批次不超過256KB限制的示例。 代碼來自這個回購協議(paolosalvatori/ServiceBusExtensions

/// <summary> 
/// This class contains extensions methods for the <see cref="EventHubClient"/> class. 
/// </summary> 
public static class EventHubClientExtensions 
{ 
    private const string EventDataListCannotBeNullOrEmpty = "The eventDataEnumerable parameter cannot be null or empty."; 
    private const string SendPartitionedBatchFormat = "[EventHubClient.SendPartitionedBatch] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]"; 
    private const string SendPartitionedBatchAsyncFormat = "[EventHubClient.SendPartitionedBatchAsync] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]"; 
    private const int MaxBathSizeInBytes = 262144; 

    /// <summary> 
    /// Asynchronously sends a batch of event data to the same partition. 
    /// All the event data in the batch need to have the same value in the Partitionkey property. 
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size. 
    /// </summary> 
    /// <param name="eventHubClient">The current <see cref="EventHubClient"/> object.</param> 
    /// <param name="messages">An IEnumerable object containing event data instances.</param> 
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param> 
    /// <returns>The asynchronous operation.</returns> 
    public static async Task SendPartitionedBatchAsync(this EventHubClient eventHubClient, IEnumerable<EventData> messages, bool trace = false) 
    { 
     var eventDataList = messages as IList<EventData> ?? messages.ToList(); 
     if (messages == null || !eventDataList.Any()) 
     { 
      throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty); 
     } 

     var batchList = new List<EventData>(); 
     long batchSize = 0; 

     foreach (var eventData in eventDataList) 
     { 
      if ((batchSize + eventData.SerializedSizeInBytes) > MaxBathSizeInBytes) 
      { 
       // Send current batch 
       await eventHubClient.SendBatchAsync(batchList); 
       Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count)); 

       // Initialize a new batch 
       batchList = new List<EventData> { eventData }; 
       batchSize = eventData.SerializedSizeInBytes; 
      } 
      else 
      { 
       // Add the EventData to the current batch 
       batchList.Add(eventData); 
       batchSize += eventData.SerializedSizeInBytes; 
      } 
     } 
     // The final batch is sent outside of the loop 
     await eventHubClient.SendBatchAsync(batchList); 
     Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count)); 
    } 

    /// <summary> 
    /// Asynchronously sends a batch of event data to the same partition. 
    /// All the event data in the batch need to have the same value in the Partitionkey property. 
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size. 
    /// </summary> 
    /// <param name="eventHubClient">The current <see cref="EventHubClient"/> object.</param> 
    /// <param name="messages">An IEnumerable object containing event data instances.</param> 
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param> 
    public static void SendPartitionedBatch(this EventHubClient eventHubClient, IEnumerable<EventData> messages, 
     bool trace = false) 
    { 
     var eventDataList = messages as IList<EventData> ?? messages.ToList(); 
     if (messages == null || !eventDataList.Any()) 
     { 
      throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty); 
     } 

     var batchList = new List<EventData>(); 
     long batchSize = 0; 

     foreach (var eventData in eventDataList) 
     { 
      if ((batchSize + eventData.SerializedSizeInBytes) > MaxBathSizeInBytes) 
      { 
       // Send current batch 
       eventHubClient.SendBatch(batchList); 
       Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count)); 

       // Initialize a new batch 
       batchList = new List<EventData> { eventData }; 
       batchSize = eventData.SerializedSizeInBytes; 
      } 
      else 
      { 
       // Add the EventData to the current batch 
       batchList.Add(eventData); 
       batchSize += eventData.SerializedSizeInBytes; 
      } 
     } 
     // The final batch is sent outside of the loop 
     eventHubClient.SendBatch(batchList); 
     Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count)); 
    } 
}