2016-07-22 40 views
0

我們有一個使用輔助角色處理從Azure服務總線上設置的主題接收的消息的雲服務。Azure主題工作角色在60秒後停止處理消息

消息本身似乎完好無損,通常會被正確接收和處理。然而,在某些情況下,該消息似乎停止處理(記錄突然結束,並且在我們的WadLogsTable中沒有更多對正在處理的消息的引用)。從我的研究來看,這可能是由於工作人員的角色將其連接打開並閒置超過幾秒鐘而發生的。我將如何去阻止這些長時間處理的消息被放棄?

我們工人角色的代碼如下。

public class WorkerRole : RoleEntryPoint 
{ 
    private static StandardKernel _kernel; 
    private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false); 
    private BaseRepository<CallData> _callDataRepository; 
    private BaseRepository<CallLog> _callLogRepository; 

    private SubscriptionClient _client; 
    private NamespaceManager _nManager; 
    private OnMessageOptions _options; 
    private BaseRepository<Site> _siteRepository; 

    public override void Run() 
    { 
     try 
     { 
      List<CallInformation> callInfo; 
      Trace.WriteLine("Starting processing of messages"); 

      // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump. 

      _client.OnMessage(message => 
      { 
       // Process message from subscription. 
       Trace.TraceInformation("Call Received. Ready to process message "); 
       message.RenewLock(); 
       callInfo = message.GetBody<List<CallInformation>>(); 
       writeCallData(callInfo); 


       Trace.TraceInformation("Call Processed. Clearing from topic."); 
      }, _options); 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Error: " + e.Message + "---" + e.StackTrace); 
     } 
    } 

    private void writeCallData(List<CallInformation> callList) 
    { 
     try 
     { 
      Trace.TraceInformation("Calls received: " + callList.Count); 
      foreach (var callInfo in callList) 
      { 
       Trace.TraceInformation("Unwrapping call..."); 
       var call = callInfo.CallLog.Unwrap(); 
       Trace.TraceInformation("Begin Processing: Local Call " + call.ID + " with " + callInfo.DataPoints.Length + " datapoints"); 
       Trace.TraceInformation("Inserting Call..."); 
       _callLogRepository.ExecuteSqlCommand(/*SNIP: Insert call*/); 
        Trace.TraceInformation("Call entry written. Now building datapoint list..."); 
        var datapoints = callInfo.DataPoints.Select(datapoint => datapoint.Unwrap()).ToList(); 
        Trace.TraceInformation("datapoint list constructed. Processing datapoints..."); 
        foreach (var data in datapoints) 
        { 
         /*SNIP: Long running code. Insert our datapoints one at a time. Sometimes our messages die in the middle of this foreach. */ 
        } 
        Trace.TraceInformation("All datapoints written for call with dependable ID " + call.Call_ID); 
       Trace.TraceInformation("Call Processed successfully."); 
      } 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Call Processing Failed. " + e.Message); 
     } 
    } 

    public override bool OnStart() 
    { 
     try 
     { 
      var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); 
      _nManager = NamespaceManager.CreateFromConnectionString(connectionString); 
      _nManager.Settings.OperationTimeout = new TimeSpan(0,0,10,0); 
      var topic = new TopicDescription("MyTopic") 
      { 
       DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 0, 10, 0), 
       DefaultMessageTimeToLive = new TimeSpan(0, 0, 10, 0), 
       RequiresDuplicateDetection = true, 
      }; 
      if (!_nManager.TopicExists("MyTopic")) 
      { 
       _nManager.CreateTopic(topic); 
      } 
      if (!_nManager.SubscriptionExists("MyTopic", "AllMessages")) 
      { 
       _nManager.CreateSubscription("MyTopic", "AllMessages"); 
      } 
      _client = SubscriptionClient.CreateFromConnectionString(connectionString, "MyTopic", "AllMessages", 
       ReceiveMode.ReceiveAndDelete); 
      _options = new OnMessageOptions 
      { 
        AutoRenewTimeout = TimeSpan.FromMinutes(5), 

      }; 
      _options.ExceptionReceived += LogErrors; 
      CreateKernel(); 

      _callLogRepository.ExecuteSqlCommand(/*SNIP: Background processing*/); 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Error on roleStart:" + e.Message + "---" + e.StackTrace); 
     } 
     return base.OnStart(); 
    } 

    public override void OnStop() 
    { 
     // Close the connection to Service Bus Queue 
     _client.Close(); 
     _completedEvent.Set(); 
    } 

    void LogErrors(object sender, ExceptionReceivedEventArgs e) 
    { 
     if (e.Exception != null) 
     { 
      Trace.TraceInformation("Error: " + e.Exception.Message + "---" + e.Exception.StackTrace); 
      _client.Close(); 
     } 
    } 

    public IKernel CreateKernel() 
    { 
     _kernel = new StandardKernel(); 
     /*SNIP: Bind NInjectable repositories */ 
     return _kernel; 
    } 
} 
+0

當運行完成後,我觀察到worker角色等待幾秒鐘後再次調用OnStart並重新進入Run() –

回答

1

TheDude的回覆非常接近正確答案!事實證明,他認爲run方法需要保持活躍狀態​​,而不是立即返回。儘管使用Azure Service Bus的消息泵機制,但不能將_client.onMessage(...)放在while循環中,因爲這會導致錯誤(消息泵已被初始化)。

實際需要發生的是需要在worker角色開始執行之前創建手動重置事件,然後在執行消息泵代碼之後等待。有關ManualResetEvent的文檔,請參閱https://msdn.microsoft.com/en-us/library/system.threading.manualresetevent(v=vs.110).aspx。此外,在此過程描述:http://www.acousticguitar.pro/questions/607359/using-queueclient-onmessage-in-an-azure-worker-role

我最後的工人角色類看起來是這樣的:

public class WorkerRole : RoleEntryPoint 
{ 
    private static StandardKernel _kernel; 
    private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false); 
    private BaseRepository<CallLog> _callLogRepository; 

    private SubscriptionClient _client; 
    private MessagingFactory _mFact; 
    private NamespaceManager _nManager; 
    private OnMessageOptions _options; 

    public override void Run() 
    { 
     ManualResetEvent CompletedEvent = new ManualResetEvent(false); 
     try 
     { 
      CallInformation callInfo; 
      // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump. 
      _client.OnMessage(message => 
      { 
       // Process message from subscription. 
       Trace.TraceInformation("Call Received. Ready to process message " + message.MessageId); 
       callInfo = message.GetBody<CallInformation>(); 
       WriteCallData(callInfo); 

       Trace.TraceInformation("Call Processed. Clearing from topic."); 
      }, _options); 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Error: " + e.Message + "---" + e.StackTrace); 
     } 
     CompletedEvent.WaitOne(); 
    } 

private void writeCallData(List<CallInformation> callList) 
{ 
    try 
    { 
     Trace.TraceInformation("Calls received: " + callList.Count); 
     foreach (var callInfo in callList) 
     { 
      Trace.TraceInformation("Unwrapping call..."); 
      var call = callInfo.CallLog.Unwrap(); 
      Trace.TraceInformation("Begin Processing: Local Call " + call.ID + " with " + callInfo.DataPoints.Length + " datapoints"); 
      Trace.TraceInformation("Inserting Call..."); 
      _callLogRepository.ExecuteSqlCommand(/*SNIP: Insert call*/); 
       Trace.TraceInformation("Call entry written. Now building datapoint list..."); 
       var datapoints = callInfo.DataPoints.Select(datapoint => datapoint.Unwrap()).ToList(); 
       Trace.TraceInformation("datapoint list constructed. Processing datapoints..."); 
       foreach (var data in datapoints) 
       { 
        /*SNIP: Long running code. Insert our datapoints one at a time. Sometimes our messages die in the middle of this foreach. */ 
       } 
       Trace.TraceInformation("All datapoints written for call with dependable ID " + call.Call_ID); 
      Trace.TraceInformation("Call Processed successfully."); 
     } 
    } 
    catch (Exception e) 
    { 
     Trace.TraceInformation("Call Processing Failed. " + e.Message); 
    } 
} 

public override bool OnStart() 
{ 
    try 
    { 
     var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); 
     _nManager = NamespaceManager.CreateFromConnectionString(connectionString); 
     _nManager.Settings.OperationTimeout = new TimeSpan(0,0,10,0); 
     var topic = new TopicDescription("MyTopic") 
     { 
      DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 0, 10, 0), 
      DefaultMessageTimeToLive = new TimeSpan(0, 0, 10, 0), 
      RequiresDuplicateDetection = true, 
     }; 
     if (!_nManager.TopicExists("MyTopic")) 
     { 
      _nManager.CreateTopic(topic); 
     } 
     if (!_nManager.SubscriptionExists("MyTopic", "AllMessages")) 
     { 
      _nManager.CreateSubscription("MyTopic", "AllMessages"); 
     } 
     _client = SubscriptionClient.CreateFromConnectionString(connectionString, "MyTopic", "AllMessages", 
      ReceiveMode.ReceiveAndDelete); 
     _options = new OnMessageOptions 
     { 
       AutoRenewTimeout = TimeSpan.FromMinutes(5), 

     }; 
     _options.ExceptionReceived += LogErrors; 
     CreateKernel(); 

     _callLogRepository.ExecuteSqlCommand(/*SNIP: Background processing*/); 
    } 
    catch (Exception e) 
    { 
     Trace.TraceInformation("Error on roleStart:" + e.Message + "---" + e.StackTrace); 
    } 
    return base.OnStart(); 
} 

public override void OnStop() 
{ 
    // Close the connection to Service Bus Queue 
    _client.Close(); 
    _completedEvent.Set(); 
} 

void LogErrors(object sender, ExceptionReceivedEventArgs e) 
{ 
    if (e.Exception != null) 
    { 
     Trace.TraceInformation("Error: " + e.Exception.Message + "---" + e.Exception.StackTrace); 
     _client.Close(); 
    } 
} 

public IKernel CreateKernel() 
{ 
    _kernel = new StandardKernel(); 
    /*SNIP: Bind NInjectable repositories */ 
    return _kernel; 
} 

}

你會發現ManualResetEvent的WaitOne的和的調用的存在()在我的Run方法結束時。我希望有人認爲這有幫助!

1

您的Run方法不會無限期地繼續。它應該是這樣的:

public override void Run() 
{ 
    try 
    { 
     Trace.WriteLine("WorkerRole entrypoint called", "Information"); 
     while (true) 
     { 
     // Add code here that runs in the role instance 
     } 

    } 
    catch (Exception e) 
    { 
     Trace.WriteLine("Exception during Run: " + e.ToString()); 
     // Take other action as needed. 
    } 
} 

docs摘自:

的運行被認爲是應用程序的主要方法。覆蓋 Run方法不是必需的;默認實現從不返回 。如果你重寫Run方法,你的代碼應該無限期地阻塞 。如果Run方法返回,則通過提高Stopping事件並調用OnStop方法 來自動回收角色 ,以便可以在角色 脫機之前執行關閉順序。

+0

嘿,謝謝你的回答。這讓我非常接近需要發生的事情。絕對值得的,但我會在我的回答中解釋。 –

相關問題