2

我正在使用Azure隊列執行批量導入。 我使用WebJobs在後臺執行該過程。 隊列非常頻繁地出隊。如何在2條消息 之間創建延遲?在隊列的兩個消息讀取之間創建延遲?

這是怎麼了增加一個消息隊列

public async Task<bool> Handle(CreateFileUploadCommand message) 
{ 
    var queueClient = _queueService.GetQueueClient(Constants.Queues.ImportQueue); 

    var brokeredMessage = new BrokeredMessage(JsonConvert.SerializeObject(new ProcessFileUploadMessage 
    { 
     TenantId = message.TenantId, 
     FileExtension = message.FileExtension, 
     FileName = message.Name, 
     DeviceId = message.DeviceId, 
     SessionId = message.SessionId, 
     UserId = message.UserId, 
     OutletId = message.OutletId, 
     CorrelationId = message.CorrelationId, 

    })) 
    { 
     ContentType = "application/json", 
    }; 

    await queueClient.SendAsync(brokeredMessage); 

    return true; 
} 

及以下的WebJobs功能。

public class Functions 
{ 
    private readonly IValueProvider _valueProvider; 
    public Functions(IValueProvider valueProvider) 
    { 
     _valueProvider = valueProvider; 
    } 

    public async Task ProcessQueueMessage([ServiceBusTrigger(Constants.Constants.Queues.ImportQueue)] BrokeredMessage message, 
    TextWriter logger) 
    { 

     var queueMessage = message.GetBody<string>(); 

     using (var client = new HttpClient()) 
     { 
      client.BaseAddress = new Uri(_valueProvider.Get("ServiceBaseUri")); 

      var stringContent = new StringContent(queueMessage, Encoding.UTF8, "application/json"); 

      var result = await client.PostAsync(RestfulUrls.ImportMenu.ProcessUrl, stringContent); 

      if (result.IsSuccessStatusCode) 
      { 
       await message.CompleteAsync(); 
      } 
      else 
      { 
       await message.AbandonAsync(); 
      } 
     } 
    } 
} 
+0

任何更新?如果您覺得我的回答很有用/有幫助,請將其標記爲答案,以便其他人可以從中受益。 –

回答

1

據我所知,天青webjobs SDK上的單個實例啓用併發處理(默認爲16)。

如果您運行webjobs,它將讀取16個隊列消息(如果函數成功完成或調用Abandon,則peeklock和調用完成消息),並創建16個進程以同時執行觸發器函數。所以你覺得隊列出隊非常頻繁。

如果你想在單個實例上禁用並行處理。

我建議你可以設置ServiceBusConfiguration的MessageOptions.MaxConcurrentCalls爲1

更多細節,你可以參考下面的代碼:

在Program.cs的:

JobHostConfiguration config = new JobHostConfiguration(); 
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration(); 
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1; 
config.UseServiceBus(serviceBusConfig); 

JobHost host = new JobHost(config); 
host.RunAndBlock(); 

如果你想爲了在兩條消息讀取之間創建一個延遲,我建議你可以創建一個自定義的ServiceBusConfiguration.MessagingProvider。

它包含CompleteProcessingMessageAsync方法,這種方法完成了指定的消息的處理,作業功能已被調用之後。

我建議你可以在CompleteProcessingMessageAsync增加了Thread.Sleep方法來實現延遲讀取。

的更多細節,可以參考下面的代碼示例:

CustomMessagingProvider.cs:

說明:我重寫CompleteProcessingMessageAsync方法的代碼。

public class CustomMessagingProvider : MessagingProvider 
    { 
     private readonly ServiceBusConfiguration _config; 

     public CustomMessagingProvider(ServiceBusConfiguration config) 
      : base(config) 
     { 
      _config = config; 
     } 

     public override NamespaceManager CreateNamespaceManager(string connectionStringName = null) 
     { 
      // you could return your own NamespaceManager here, which would be used 
      // globally 
      return base.CreateNamespaceManager(connectionStringName); 
     } 

     public override MessagingFactory CreateMessagingFactory(string entityPath, string connectionStringName = null) 
     { 
      // you could return a customized (or new) MessagingFactory here per entity 
      return base.CreateMessagingFactory(entityPath, connectionStringName); 
     } 

     public override MessageProcessor CreateMessageProcessor(string entityPath) 
     { 
      // demonstrates how to plug in a custom MessageProcessor 
      // you could use the global MessageOptions, or use different 
      // options per entity 
      return new CustomMessageProcessor(_config.MessageOptions); 
     } 

     private class CustomMessageProcessor : MessageProcessor 
     { 
      public CustomMessageProcessor(OnMessageOptions messageOptions) 
       : base(messageOptions) 
      { 
      } 

      public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken) 
      { 
       // intercept messages before the job function is invoked 
       return base.BeginProcessingMessageAsync(message, cancellationToken); 
      } 

      public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken) 
      { 
       if (result.Succeeded) 
       { 
        if (!MessageOptions.AutoComplete) 
        { 
         // AutoComplete is true by default, but if set to false 
         // we need to complete the message 
         cancellationToken.ThrowIfCancellationRequested(); 


         await message.CompleteAsync(); 

         Console.WriteLine("Begin sleep"); 
         //Sleep 5 seconds 
         Thread.Sleep(5000); 
         Console.WriteLine("Sleep 5 seconds"); 

        } 
       } 
       else 
       { 
        cancellationToken.ThrowIfCancellationRequested(); 
        await message.AbandonAsync(); 
       } 
      } 
     } 
    } 

Program.cs的主要方法:

static void Main() 
     { 
      var config = new JobHostConfiguration(); 

      if (config.IsDevelopment) 
      { 
       config.UseDevelopmentSettings(); 
      } 

      var sbConfig = new ServiceBusConfiguration 
      { 
       MessageOptions = new OnMessageOptions 
       { 
        AutoComplete = false, 
        MaxConcurrentCalls = 1 
       } 
      }; 
      sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig); 
      config.UseServiceBus(sbConfig); 
      var host = new JobHost(config); 

      // The following code ensures that the WebJob will be running continuously 
      host.RunAndBlock(); 
     } 

結果:enter image description here

+0

因爲這是你應該替換'async'方法'的Thread.Sleep()'和'等待Task.Delay()'。因此,底層的線程沒有被完全阻塞,可在平均時間處理其他任務。 – Oliver