1

我遇到了一個工作者角色的問題,其中角色似乎成功啓動並註冊EventProcessor類實現(EventProcessorA和EventProcessorB),但他們都沒有拿起任何事件。通過這個,我的意思是IEventProcessor.ProcessEventsAsync方法根本沒有被打中。Azure雲服務 - 事件處理器IEventProcessor.ProcessEventsAsync沒有被擊中

每個EventProcessor類都有它自己的事件集線器。

我的日誌顯示爲EventProcessor類調用構造函數和OpenAsync方法。事實上,他們被調用的次數如下4次。但是在此之後,就沒有發生額外的活動。我猜4次是因爲有四個分區。

SimpleEventProcessorA - Constructor 
SimpleEventProcessorA - OpenAsync 
SimpleEventProcessorA - Constructor 
SimpleEventProcessorA - OpenAsync 
SimpleEventProcessorA - Constructor 
SimpleEventProcessorA - OpenAsync 
SimpleEventProcessorB - Constructor 
SimpleEventProcessorB - Open Async 
SimpleEventProcessorB - Constructor 
SimpleEventProcessorB - OpenAsync 
SimpleEventProcessorB - Constructor 
SimpleEventProcessorB - OpenAsync 
SimpleEventProcessorB - Constructor 
SimpleEventProcessorB - OpenAsync 
SimpleEventProcessorA - Constructor 
SimpleEventProcessorA - OpenAsync 

有沒有在輔助角色的RunAsync方法要麼使所有的事件應該來洪水規定的EventProcessorOptions偏移。

而且,在Azure門戶我看到的事件來通過,當我踢他們關閉。

工作者角色代碼,註冊EventProcessor:

public class WorkerRole : RoleEntryPoint 
{ 
    private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); 
    private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false); 

    private EventProcessorHost eventProcessorHostA; 
    private EventProcessorHost eventProcessorHostB; 

    public override void Run() 
    { 
     Trace.TraceInformation("ReportWorkerRole is running"); 

     try 
     { 
      this.RunAsync(this.cancellationTokenSource.Token).Wait(); 
     } 
     finally 
     { 
      this.runCompleteEvent.Set(); 
     } 
    } 

    public override bool OnStart() 
    { 
     // Set the maximum number of concurrent connections 
     ServicePointManager.DefaultConnectionLimit = 12; 

     // For information on handling configuration changes 
     // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. 

     bool result = base.OnStart(); 

     Trace.TraceInformation("ReportWorkerRole has been started"); 

     //EventHub Processing 
     try 
     { 
      string eventHubNameA = CloudConfigurationManager.GetSetting("EventHubNameA"); 
      string eventHubNameB = CloudConfigurationManager.GetSetting("EventHubNameB"); 
      string eventHubConnectionString = CloudConfigurationManager.GetSetting("EventHubConnectionString"); 

      string storageAccountName = CloudConfigurationManager.GetSetting("AzureStorageAccount"); 
      string storageAccountKey = CloudConfigurationManager.GetSetting("AzureStorageAccountKey"); 
      string storageConnectionString = CloudConfigurationManager.GetSetting("AzureStorageAccountConnectionString"); 

      string eventProcessorHostNameA = Guid.NewGuid().ToString(); 
      eventProcessorHostA = new EventProcessorHost(eventProcessorHostNameA, eventHubNameA, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString); 

      string eventProcessorHostNameB = Guid.NewGuid().ToString(); 
      eventProcessorHostB = new EventProcessorHost(eventProcessorHostNameB, eventHubNameB, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString); 

     } 
     catch (Exception ex) 
     { 
      //Logging omitted 
     } 

     return result; 
    } 

    public override void OnStop() 
    { 
     Trace.TraceInformation("ReportWorkerRole is stopping"); 

     this.eventProcessorHostA.UnregisterEventProcessorAsync().Wait(); 
     this.eventProcessorHostB.UnregisterEventProcessorAsync().Wait(); 

     this.cancellationTokenSource.Cancel(); 
     this.runCompleteEvent.WaitOne(); 

     base.OnStop(); 

     Trace.TraceInformation("ReportWorkerRole has stopped"); 
    } 

    private async Task RunAsync(CancellationToken cancellationToken) 
    { 

     var options = new EventProcessorOptions() 
     { 
      MaxBatchSize = 100, 
      PrefetchCount = 10, 
      ReceiveTimeOut = TimeSpan.FromSeconds(20), 
      //InitialOffsetProvider = (partitionId) => DateTime.Now 
     }; 

     options.ExceptionReceived += (sender, e) => 
     { 
      //Logging omitted 
     }; 

     //Tried both using await and wait 
     eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options).Wait(); 
     eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options).Wait(); 
     //await eventProcessorHostA.RegisterEventProcessorAsync<SimpleEventProcessorA>(options); 
     //await eventProcessorHostB.RegisterEventProcessorAsync<SimpleEventProcessorB>(options); 

     // TODO: Replace the following with your own logic. 
     while (!cancellationToken.IsCancellationRequested) 
     { 
      Trace.TraceInformation("Working"); 
      await Task.Delay(1000); 
     } 
    } 
} 

事件處理器A(相同的結構B):

class SimpleEventProcessorA : IEventProcessor 
{ 

    Stopwatch checkpointStopWatch; 

    //Non-relevant variables omitted 

    public SimpleEventProcessorA() 
    { 
     try 
     { 
      //Initializing variables using CloudConfigurationManager 

      //Logging omitted 
     } 
     catch (Exception ex) 
     { 
      //Logging omitted 
     } 
    } 

    async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason) 
    { 
     //Logging omitted 

     Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason); 
     if (reason == CloseReason.Shutdown) 
     { 
      await context.CheckpointAsync(); 
     } 
    } 

    Task IEventProcessor.OpenAsync(PartitionContext context) 
    { 
     try 
     { 
      //Logging omitted 

      Console.WriteLine("Initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset); 
      this.checkpointStopWatch = new Stopwatch(); 
      this.checkpointStopWatch.Start(); 

      return Task.FromResult<object>(null); 
     } 
     catch (Exception ex) 
     { 
      //Logging omitted 

      return Task.FromResult<object>(null); 
     } 
    } 

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
    { 
     //Logging omitted 

     foreach (EventData eventData in messages) 
     { 
      try 
      { 
       //Logging omitted 

       Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'", 
        context.Lease.PartitionId, data)); 
      } 
      catch (Exception ex) 
      { 
       //Logging omitted 

       throw; 
      } 
     } 

     //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts. 
     if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5)) 
     { 
      await context.CheckpointAsync(); 
      this.checkpointStopWatch.Restart(); 
     } 
    } 

} 

希望得到任何幫助,謝謝!

UPDATE

看起來一切都很好......這東西推到事件樞紐當我使用的連接字符串。

這是我的活動樞紐連接字符串:

Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey] 

entityPath設置不正確。它正在使用我設定的較舊的事件中心名稱。它應該是爲eventHubNameA或eventHubNameB設置的值。

+0

您可以共享主機並註冊eventprocessor的代碼? –

+0

@PeterBons更新,謝謝。 – RizJa

+0

在'EventProcessorOptions'中,嘗試訂閱'GeneralProcessor_ExceptionReceived'事件並將'InvokeProcessorAfterReceiveTimeout'屬性設置爲true。 – cassandrad

回答

0

回答問題,以便其他人可以從中受益。雖然答案在「更新」一節中的問題中有詳細說明,但我會在此重申:

entityPath設置不正確。它正在使用我設定的較舊的事件中心名稱。它應該是爲eventHubNameA或eventHubNameB設置的值。

而不是 Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[old-eventhub-name];SharedAccessKey=[mykey]

它應該是Endpoint=sb://[myEventHubName].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;entityPath=[eventHubNameA];SharedAccessKey=[mykey]