2017-10-07 82 views
1

我有一個天青工作者角色,並且事件處理器主機連接到一個天青事件集線器。由於某種未知的原因 - 它不會收到任何消息。事件處理器主機未收到消息

日誌顯示它爲每個分區打開EventProcessor - 並且沒有錯誤 - 但ProcessEventsAsync永遠不會被調用。

使用服務總線瀏覽器我可以看到它在處理器關閉時接收到消息,當它處於關閉狀態時會拋出接收器處於打開狀態的異常。

  • 我沒有得到它的工作一次,但重啓後並沒有繼續工作

我不知道在哪裏旁邊看 - 這不過是輔助角色的代碼

public class WorkerRole : RoleEntryPoint 
{ 
    private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); 
    private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false); 

    private EventProcessorHost _eventProcessorHost; 
    private IEventProcessorFactory _processorFactory; 
    private ConfigurationProvider configuration = new ConfigurationProvider(); 
    private string _eventHubConnectionString; 
    private string _storageAccountConnectionString; 
    private string _dbConnectionString; 

    public override void Run() 
    { 
     Trace.TraceInformation("EventHubWorker is running"); 


     try 
     { 
      RunAsync(_cancellationTokenSource.Token).Wait(); 
     } 
     finally 
     { 
      _runCompleteEvent.Set(); 
     } 
    } 

    public override bool OnStart() 
    { 
     Trace.TraceInformation("EventHubWorker is starting"); 
     CompositeResolver.RegisterAndSetAsDefault(FormattersResolver.Instance, ContractlessStandardResolver.Instance, StandardResolver.Instance); 
     // Set the maximum number of concurrent connections 
     ServicePointManager.DefaultConnectionLimit = 12; 
     SqlMapper.AddTypeHandler(new DateTimeHandler()); 
     _eventHubConnectionString = configuration.EventHubConnectionString; 
     _dbConnectionString = configuration.DbConnectionString; 
     _storageAccountConnectionString = configuration.StorageConnectionString; 
     string hostName = Guid.NewGuid().ToString(); 
     var eventClient = EventHubClient.CreateFromConnectionString(_eventHubConnectionString, configuration.EventHubName); 

     _eventProcessorHost = new EventProcessorHost(hostName, eventClient.Path, configuration.ConsumerGroupName, 
      _eventHubConnectionString, _storageAccountConnectionString); 

     var partitionOptions = new PartitionManagerOptions() 
     { 
      LeaseInterval = new TimeSpan(0, 5, 0) 
     }; 
     _processorFactory = new EventProcessorFactory(/* some data for dependency injection */); 

     return base.OnStart(); 
    } 

    public override void OnStop() 
    { 
     Trace.TraceInformation("EventHubWorker is stopping"); 

     _cancellationTokenSource.Cancel(); 
     _runCompleteEvent.WaitOne(); 
     base.OnStop(); 

     Trace.TraceInformation("EventHubWorker has stopped"); 
    } 

    private async Task RunAsync(CancellationToken cancellationToken) 
    { 
     int retryCount = 0; 
     var exceptions = new List<Exception>(); 
     async Task StartProcessing() 
     { 
      if (retryCount > 5) 
      { 
       throw new AggregateException($"failed to run service, tried {retryCount} times",exceptions); 
      } 
      try 
      { 
       await _eventProcessorHost.RegisterEventProcessorFactoryAsync(_processorFactory, new EventProcessorOptions 
       { 
        InitialOffsetProvider = o => DateTime.UtcNow, 
        MaxBatchSize = 100, 
        PrefetchCount = 10, 
        ReceiveTimeOut = TimeSpan.FromSeconds(20), 
       }); 
      } 
      catch(MessagingException e) when (e.IsTransient) 
      { 
       retryCount++; 
       exceptions.Add(e); 
       await StartProcessing(); 
      } 
     } 
     var options = new EventProcessorOptions(); 
     options.ExceptionReceived += Options_ExceptionReceived; 

     await StartProcessing(); 

     cancellationToken.WaitHandle.WaitOne(); 
     await _eventProcessorHost.UnregisterEventProcessorAsync(); 
    } 

    private void Options_ExceptionReceived(object sender, ExceptionReceivedEventArgs e) 
    { 
     Trace.TraceError(e.Exception.Message); 
    } 
} 

這是EventProcessor代碼 - 工廠本身似乎無關緊要

class EventProcessor : IEventProcessor 
{ 
    public async Task CloseAsync(PartitionContext context, CloseReason reason) 
    { 
        //never logged 
     Trace.TraceInformation($"Partition {context.Lease.PartitionId} Closed"); 
     if (reason == CloseReason.Shutdown) 
     { 
      await context.CheckpointAsync(); 
     } 
     else 
     { 
      Trace.TraceError(reason.ToString()); 
     } 
    } 

    public Task OpenAsync(PartitionContext context) 
    { 
        //always logs with the expected lease information 
     Trace.TraceInformation($"Partition {context.Lease.PartitionId} initiailized with epoch {context.Lease.Epoch}"); 
     return Task.FromResult<object>(null); 
    } 

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
    { 
     Trace.TraceInformation("processing event"); //never called 
     // processing code 
    } 
+0

如果你在那裏放置斷點,它是否從'OpenAsync'到'CloseAsync'? –

+0

我會嘗試放置一個斷點 - 但是當我調用'CloseAsync'時記錄,並且沒有這樣的日誌。 – gilmishal

+0

正如預期的那樣,斷點也沒有受到影響 – gilmishal

回答

0

PartitionManagerOptions的最大租約間隔爲60秒(與blob租約相同) EventProcessorHost在最初獲取租約時不會引發異常。嘗試將租賃時間間隔設置爲60秒而不是5分鐘。