2013-02-15 89 views
111

我試圖用SignalR應用程序創建Redis消息總線故障轉移方案。使用SignalR與Redis messagebus故障轉移使用BookSleeve的ConnectionUtils.Connect()

首先,我們嘗試了一種簡單的硬件負載平衡器故障切換,它只監視兩臺Redis服務器。 SignalR應用程序指向單一的HLB端點。然後,我失敗了一臺服務器,但無法在第二臺Redis服務器上成功獲取任何消息,而無需再次使用SignalR應用程序池。大概這是因爲它需要將設置命令發佈到新的Redis消息總線上。

從SignalR RC1開始,Microsoft.AspNet.SignalR.Redis.RedisMessageBus使用Booksleeve的RedisConnection()連接到單個Redis的pub/sub。

我創建了一個新類RedisMessageBusCluster(),它使用Booksleeve的ConnectionUtils.Connect()連接到Redis服務器集羣中的一個。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using BookSleeve; 
using Microsoft.AspNet.SignalR.Infrastructure; 

namespace Microsoft.AspNet.SignalR.Redis 
{ 
    /// <summary> 
    /// WIP: Getting scaleout for Redis working 
    /// </summary> 
    public class RedisMessageBusCluster : ScaleoutMessageBus 
    { 
     private readonly int _db; 
     private readonly string[] _keys; 
     private RedisConnection _connection; 
     private RedisSubscriberConnection _channel; 
     private Task _connectTask; 

     private readonly TaskQueue _publishQueue = new TaskQueue(); 

     public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver) 
      : base(resolver) 
     { 
      _db = db; 
      _keys = keys.ToArray(); 

      // uses a list of connections 
      _connection = ConnectionUtils.Connect(serverList); 

      //_connection = new RedisConnection(host: server, port: port, password: password); 

      _connection.Closed += OnConnectionClosed; 
      _connection.Error += OnConnectionError; 


      // Start the connection - TODO: can remove this Open as the connection is already opened, but there's the _connectTask is used later on 
      _connectTask = _connection.Open().Then(() => 
      { 
       // Create a subscription channel in redis 
       _channel = _connection.GetOpenSubscriberChannel(); 

       // Subscribe to the registered connections 
       _channel.Subscribe(_keys, OnMessage); 

       // Dirty hack but it seems like subscribe returns before the actual 
       // subscription is properly setup in some cases 
       while (_channel.SubscriptionCount == 0) 
       { 
        Thread.Sleep(500); 
       } 
      }); 
     } 


     protected override Task Send(Message[] messages) 
     { 
      return _connectTask.Then(msgs => 
      { 
       var taskCompletionSource = new TaskCompletionSource<object>(); 

       // Group messages by source (connection id) 
       var messagesBySource = msgs.GroupBy(m => m.Source); 

       SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource); 

       return taskCompletionSource.Task; 
      }, 
      messages); 
     } 

     private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource) 
     { 
      if (!enumerator.MoveNext()) 
      { 
       taskCompletionSource.TrySetResult(null); 
      } 
      else 
      { 
       IGrouping<string, Message> group = enumerator.Current; 

       // Get the channel index we're going to use for this message 
       int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length; 

       string key = _keys[index]; 

       // Increment the channel number 
       _connection.Strings.Increment(_db, key) 
            .Then((id, k) => 
            { 
             var message = new RedisMessage(id, group.ToArray()); 

             return _connection.Publish(k, message.GetBytes()); 
            }, key) 
            .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource) 
            .ContinueWithNotComplete(taskCompletionSource); 
      } 
     } 

     private void OnConnectionClosed(object sender, EventArgs e) 
     { 
      // Should we auto reconnect? 
      if (true) 
      { 
       ; 
      } 
     } 

     private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e) 
     { 
      // How do we bubble errors? 
      if (true) 
      { 
       ; 
      } 
     } 

     private void OnMessage(string key, byte[] data) 
     { 
      // The key is the stream id (channel) 
      var message = RedisMessage.Deserialize(data); 

      _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages)); 
     } 

     protected override void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       if (_channel != null) 
       { 
        _channel.Unsubscribe(_keys); 
        _channel.Close(abort: true); 
       } 

       if (_connection != null) 
       { 
        _connection.Close(abort: true); 
       }     
      } 

      base.Dispose(disposing); 
     } 
    } 
} 

Booksleeve都有自己確定一個主機構,並會自動切換到另一臺服務器,和我現在SignalR.Chat測試這個。

web.config,我設置的可用服務器列表:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/> 

然後在Application_Start()

 // Redis cluster server list 
     string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"]; 

     List<string> eventKeys = new List<string>(); 
     eventKeys.Add("SignalR.Redis.FailoverTest"); 
     GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys); 

我添加了兩個額外的方法Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys) 
{ 
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys); 
} 

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys) 
{ 
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver)); 
    resolver.Register(typeof(IMessageBus),() => bus.Value); 

    return resolver; 
} 

現在問題是,當我有幾個斷點啓用時,你在添加用戶名後,然後禁用所有斷點,應用程序按預期工作。但是,從一開始就禁用斷點,在連接過程中似乎會出現一些可能失敗的競爭條件。

因此,在RedisMessageCluster()

// Start the connection 
    _connectTask = _connection.Open().Then(() => 
    { 
     // Create a subscription channel in redis 
     _channel = _connection.GetOpenSubscriberChannel(); 

     // Subscribe to the registered connections 
     _channel.Subscribe(_keys, OnMessage); 

     // Dirty hack but it seems like subscribe returns before the actual 
     // subscription is properly setup in some cases 
     while (_channel.SubscriptionCount == 0) 
     { 
      Thread.Sleep(500); 
     } 
    }); 

我嘗試添加兩者Task.Wait,甚至一個附加Sleep()(未如上所示) - 這在等待/等,但仍然得到錯誤。

反覆出現的錯誤似乎是在Booksleeve.MessageQueue.cs〜LN 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll 
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed 
    at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 
    at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 
    at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 
    at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 
    at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821 
    --- End of inner exception stack trace --- 
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed 
    at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 
    at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 
    at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 
    at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 
    at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<--- 



public void Enqueue(RedisMessage item, bool highPri) 
{ 
    lock (stdPriority) 
    { 
     if (closed) 
     { 
      throw new InvalidOperationException("The queue is closed"); 
     } 

如果一個封閉的隊列異常被拋出。

我預見到另一個問題:由於Redis連接是在Application_Start()中進行的,因此在「重新連接」到另一臺服務器時可能會出現一些問題。但是,我認爲這是有效的,當使用單數RedisConnection(),只有一個連接可供選擇。然而,隨着ConnectionUtils.Connect()的引入,我想從@dfowler或其他SignalR傢伙那裏瞭解SignalR如何處理這種情況。

+0

我會看看,但是:首先發生的是,您不必調用'Open',因爲您已經打開了連接。儘管如此,我將無法立即看到,因爲我準備搭乘飛機 – 2013-02-15 20:16:32

+0

我相信這裏有兩個問題。 1)Booksleeve如何處理故障轉移; 2)SignalR如何使用遊標來跟蹤客戶。當新的消息總線初始化時,mb1中的所有遊標都不會在mb2上存在。因此,重置SignalR應用程序池時,它將開始工作 - 而不是之前,這顯然不是一個可行的選擇。 – ElHaix 2013-02-19 16:04:50

+2

描述SignalR如何使用光標的鏈接:http://stackoverflow.com/questions/13054592/how-does-signalr-redis-work-under-the-hood/13063449#13063449 – ElHaix 2013-02-19 16:52:25

回答

14

SignalR團隊現在實現了對支持自定義連接工廠StackExchange.Redis(BookSleeve的後繼者)的支持,後者通過ConnectionMultiplexer支持冗餘Redis連接。

最初遇到的問題是,儘管在BookSleeve中創建了我自己的擴展方法來接受服務器集合,但故障切換是不可能的。

現在,隨着BookSleeve到StackExchange.Redis的發展,我們現在可以在Connect初始化集合configure服務器/端口的權利。

新的做法是比我下去的時候,在創建UseRedisCluster方法的道路簡單得多,而且後端現在pluming支持真正的故障轉移:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true"); 

StackExchange.Redis還允許作爲文檔的Automatic and Manual Configuration部分概述額外的手動配置:

ConfigurationOptions config = new ConfigurationOptions 
{ 
    EndPoints = 
    { 
     { "redis0", 6379 }, 
     { "redis1", 6380 } 
    }, 
    CommandMap = CommandMap.Create(new HashSet<string> 
    { // EXCLUDE a few commands 
     "INFO", "CONFIG", "CLUSTER", 
     "PING", "ECHO", "CLIENT" 
    }, available: false), 
    KeepAlive = 180, 
    DefaultVersion = new Version(2, 8, 8), 
    Password = "changeme" 
}; 

在本質上,與服務器的集合初始化我們SignalR橫向擴展環境的能力,現在解決了初始proble米

+0

我應該用500代表獎勵回報你的答案嗎? ;) – nicael 2014-06-24 13:54:05

+0

那麼,如果你認爲現在是*答案* :) – ElHaix 2014-06-24 15:19:35

+0

@ElHaix自從你問這個問題以來,你可能最有資格說你的答案是否是確鑿的,或者它是否只是謎題中的一部分 - 我建議增加一句話來表明是否可能以及如何解決你的問題 – 2014-06-25 13:14:29