下面是如何使用發送批次不超過256KB限制的示例。 代碼來自這個回購協議(paolosalvatori/ServiceBusExtensions)
/// <summary>
/// This class contains extensions methods for the <see cref="EventHubClient"/> class.
/// </summary>
public static class EventHubClientExtensions
{
private const string EventDataListCannotBeNullOrEmpty = "The eventDataEnumerable parameter cannot be null or empty.";
private const string SendPartitionedBatchFormat = "[EventHubClient.SendPartitionedBatch] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
private const string SendPartitionedBatchAsyncFormat = "[EventHubClient.SendPartitionedBatchAsync] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
private const int MaxBathSizeInBytes = 262144;
/// <summary>
/// Asynchronously sends a batch of event data to the same partition.
/// All the event data in the batch need to have the same value in the Partitionkey property.
/// If the batch size is greater than the maximum batch size,
/// the method partitions the original batch into multiple batches,
/// each smaller in size than the maximum batch size.
/// </summary>
/// <param name="eventHubClient">The current <see cref="EventHubClient"/> object.</param>
/// <param name="messages">An IEnumerable object containing event data instances.</param>
/// <param name="trace">true to cause a message to be written; otherwise, false.</param>
/// <returns>The asynchronous operation.</returns>
public static async Task SendPartitionedBatchAsync(this EventHubClient eventHubClient, IEnumerable<EventData> messages, bool trace = false)
{
var eventDataList = messages as IList<EventData> ?? messages.ToList();
if (messages == null || !eventDataList.Any())
{
throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty);
}
var batchList = new List<EventData>();
long batchSize = 0;
foreach (var eventData in eventDataList)
{
if ((batchSize + eventData.SerializedSizeInBytes) > MaxBathSizeInBytes)
{
// Send current batch
await eventHubClient.SendBatchAsync(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
// Initialize a new batch
batchList = new List<EventData> { eventData };
batchSize = eventData.SerializedSizeInBytes;
}
else
{
// Add the EventData to the current batch
batchList.Add(eventData);
batchSize += eventData.SerializedSizeInBytes;
}
}
// The final batch is sent outside of the loop
await eventHubClient.SendBatchAsync(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
}
/// <summary>
/// Asynchronously sends a batch of event data to the same partition.
/// All the event data in the batch need to have the same value in the Partitionkey property.
/// If the batch size is greater than the maximum batch size,
/// the method partitions the original batch into multiple batches,
/// each smaller in size than the maximum batch size.
/// </summary>
/// <param name="eventHubClient">The current <see cref="EventHubClient"/> object.</param>
/// <param name="messages">An IEnumerable object containing event data instances.</param>
/// <param name="trace">true to cause a message to be written; otherwise, false.</param>
public static void SendPartitionedBatch(this EventHubClient eventHubClient, IEnumerable<EventData> messages,
bool trace = false)
{
var eventDataList = messages as IList<EventData> ?? messages.ToList();
if (messages == null || !eventDataList.Any())
{
throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty);
}
var batchList = new List<EventData>();
long batchSize = 0;
foreach (var eventData in eventDataList)
{
if ((batchSize + eventData.SerializedSizeInBytes) > MaxBathSizeInBytes)
{
// Send current batch
eventHubClient.SendBatch(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
// Initialize a new batch
batchList = new List<EventData> { eventData };
batchSize = eventData.SerializedSizeInBytes;
}
else
{
// Add the EventData to the current batch
batchList.Add(eventData);
batchSize += eventData.SerializedSizeInBytes;
}
}
// The final batch is sent outside of the loop
eventHubClient.SendBatch(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count));
}
}
也許它可以直接與事件樞紐來完成,別人也許可以回答這個問題,但解決這一問題的傳統方法是分別,只是存儲數據在活動中發送一個ID。在azure中,您可以將您的數據存儲在存儲桶中,並只需通過id將其下載到處理事件的地方。 – alun
您的方案中單條消息有多大?由於你的帖子中存在矛盾,我有點困惑。你說單個消息保證小於256 KB,但接下來你將討論以256 KB爲單位分割單個消息。因此我的問題。 –
@Peter Bons - 我從Blob/CSV文件讀取。我正在寫一行這個blob/csv到我們的存儲器中並推送給EH。這個blob大小可能是8KB,也可能是256KB甚至500KB。我希望我的設計能夠考慮這個限制。我現在正在做的是,我將SendBatchAsync這些行用於EH。如果我單獨執行(每行讀取)並使用SendAsync,則不會面臨此限制,因爲1行CSV將小於256KB。希望澄清。 – khar