1
我有一個天青工作者角色,並且事件處理器主機連接到一個天青事件集線器。由於某種未知的原因 - 它不會收到任何消息。事件處理器主機未收到消息
日誌顯示它爲每個分區打開EventProcessor
- 並且沒有錯誤 - 但ProcessEventsAsync
永遠不會被調用。
使用服務總線瀏覽器我可以看到它在處理器關閉時接收到消息,當它處於關閉狀態時會拋出接收器處於打開狀態的異常。
- 我沒有得到它的工作一次,但重啓後並沒有繼續工作
我不知道在哪裏旁邊看 - 這不過是輔助角色的代碼
public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);
private EventProcessorHost _eventProcessorHost;
private IEventProcessorFactory _processorFactory;
private ConfigurationProvider configuration = new ConfigurationProvider();
private string _eventHubConnectionString;
private string _storageAccountConnectionString;
private string _dbConnectionString;
public override void Run()
{
Trace.TraceInformation("EventHubWorker is running");
try
{
RunAsync(_cancellationTokenSource.Token).Wait();
}
finally
{
_runCompleteEvent.Set();
}
}
public override bool OnStart()
{
Trace.TraceInformation("EventHubWorker is starting");
CompositeResolver.RegisterAndSetAsDefault(FormattersResolver.Instance, ContractlessStandardResolver.Instance, StandardResolver.Instance);
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
SqlMapper.AddTypeHandler(new DateTimeHandler());
_eventHubConnectionString = configuration.EventHubConnectionString;
_dbConnectionString = configuration.DbConnectionString;
_storageAccountConnectionString = configuration.StorageConnectionString;
string hostName = Guid.NewGuid().ToString();
var eventClient = EventHubClient.CreateFromConnectionString(_eventHubConnectionString, configuration.EventHubName);
_eventProcessorHost = new EventProcessorHost(hostName, eventClient.Path, configuration.ConsumerGroupName,
_eventHubConnectionString, _storageAccountConnectionString);
var partitionOptions = new PartitionManagerOptions()
{
LeaseInterval = new TimeSpan(0, 5, 0)
};
_processorFactory = new EventProcessorFactory(/* some data for dependency injection */);
return base.OnStart();
}
public override void OnStop()
{
Trace.TraceInformation("EventHubWorker is stopping");
_cancellationTokenSource.Cancel();
_runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("EventHubWorker has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
int retryCount = 0;
var exceptions = new List<Exception>();
async Task StartProcessing()
{
if (retryCount > 5)
{
throw new AggregateException($"failed to run service, tried {retryCount} times",exceptions);
}
try
{
await _eventProcessorHost.RegisterEventProcessorFactoryAsync(_processorFactory, new EventProcessorOptions
{
InitialOffsetProvider = o => DateTime.UtcNow,
MaxBatchSize = 100,
PrefetchCount = 10,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
});
}
catch(MessagingException e) when (e.IsTransient)
{
retryCount++;
exceptions.Add(e);
await StartProcessing();
}
}
var options = new EventProcessorOptions();
options.ExceptionReceived += Options_ExceptionReceived;
await StartProcessing();
cancellationToken.WaitHandle.WaitOne();
await _eventProcessorHost.UnregisterEventProcessorAsync();
}
private void Options_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
Trace.TraceError(e.Exception.Message);
}
}
這是EventProcessor代碼 - 工廠本身似乎無關緊要
class EventProcessor : IEventProcessor
{
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
//never logged
Trace.TraceInformation($"Partition {context.Lease.PartitionId} Closed");
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
else
{
Trace.TraceError(reason.ToString());
}
}
public Task OpenAsync(PartitionContext context)
{
//always logs with the expected lease information
Trace.TraceInformation($"Partition {context.Lease.PartitionId} initiailized with epoch {context.Lease.Epoch}");
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
Trace.TraceInformation("processing event"); //never called
// processing code
}
如果你在那裏放置斷點,它是否從'OpenAsync'到'CloseAsync'? –
我會嘗試放置一個斷點 - 但是當我調用'CloseAsync'時記錄,並且沒有這樣的日誌。 – gilmishal
正如預期的那樣,斷點也沒有受到影響 – gilmishal