2011-09-29 54 views
3

我有一個場景,這是非常接近這個示例:的Node.js + socket.io +節點AMQP和隊列binginds當「再」連接直通socket.io

一個主屏幕:

  • 這個屏幕(客戶端)將通過服務器連接到socket.io服務器:9090/scope(io.connect(「http:// server:9090/scope))並將發送一個事件」userBindOk「 .emit(「userBindOk」,message))到socket.io服務器;

  • 服務器接收到連接並且「userBindOk 」。此時,服務器應該獲得到rabbitmq服務器的活動連接,並將隊列綁定到通過socket.io連接到應用程序的相應用戶。樣品:

    socket.on( 「連接」,函數(客戶端){// 客戶端ID爲1234 //綁定RabbitMQ的交換,隊列和: queue.subscribe(//接收回調); } )

  • 到目前爲止,沒問題 - 我可以通過socket.io發送/接收消息,沒有任何問題。

  • 但是,如果我刷新頁面,所有這些步驟將再次完成。因此,綁定到隊列將發生,但這次與另一個socket.io客戶端會話相關。這意味着如果我向與第一個socket.io會話相關的隊列發送消息(在頁面刷新之前),該綁定應該(我認爲)接收消息並將其發送給無效的socket.io客戶端(頁面在socket.io上下文中刷新= new client.id)。我可以證明這種行爲,因爲每次刷新頁面時,我都需要發送更多的消息x次。例如:我已經第一次連接了: - 所以,1條消息 - 一次屏幕更新;刷新頁面:我需要發送2條消息到隊列,只有第二條消息將從「實際」socket.io客戶端會話接收 - 這種行爲將發生多達我刷新頁面(20頁刷新,20條消息被髮送到一個隊列,並且服務器socket.io「last」客戶端將把該消息發送到客戶端socket.io以呈現到屏幕中)。

我認爲解決方案是:

  • 找到一個方法來「解除綁定」從socket.io服務器斷開連接時的隊列 - 我didn`t看到在節點AMQP此選項API尚未(等待它:D)

  • 找到一種方法來使用相同的client.id重新連接socket.io客戶端。通過這種方式,我可以識別即將到來的客戶端並應用一些邏輯來緩存套接字。

任何想法?我想很清楚......但是,你知道,它`不是那麼eaey試圖澄清的東西,是非常具體的一些方面,當暴露你的問題......

TKS

回答

1

我解決它這樣的:

我用來聲明rabbitMq隊列爲持久= true,autoDelete = false,exclusive = false,並在我的應用程序有1個隊列/用戶和1交換(type =直接)與routing_key名稱= queueName,我的應用程序也使用隊列爲其他客戶端不同瀏覽器,如android應用程序或iPhone應用程序作爲推後備,所以我用crear 1隊列爲earch用戶。

此問題的解決方案是更改我的rabbitMQ隊列和交換聲明。現在,我將exchange/user聲明爲fanout和autoDelete = True,並且用戶將擁有持久= true,autoDelete = true,exclusive = true(隊列號=客戶端)的N個隊列,並且所有隊列都綁定到用戶交換(多播)。

注意:我的應用程序在django中很流行,並且我使用node + socket + amqp能夠使用web.scokets與瀏覽器通信,因此我使用node-restler查詢我的應用程序api以獲取用戶 - 隊列信息。

多數民衆贊成在RabbitMQ的一側,用於節點+ AMQP +插座我這樣做:

服務器端:

  • 的onConnect:用戶交流的扇出,自動刪除,耐用的聲明。然後將隊列聲明爲持久的,自動刪除和排他性的,然後將queue.bind傳遞給用戶交換並最終queue.subscribe和socket.disconnect將銷燬隊列,以便在客戶端連接應用程序時存在隊列,這解決了刷新的問題,並允許用戶已經超過1窗口標籤與應用:

服務器端:

  /* 
      * unCaught exception handler 
      */ 

      process.on('uncaughtException', function (err) { 
       sys.p('Caught exception: ' + err); 
       global.connection.end(); 
      }); 


      /* 
      * Requiere libraries 
      */ 

      global.sys = require('sys'); 
      global.amqp = require('amqp'); 
      var rest = require('restler'); 
      var io = require('socket.io').listen(8080); 

      /* 
      * Module global variables 
      */ 
      global.amqpReady = 0; 


      /* 
      * RabbitMQ connection 
      */ 

      global.connection = global.amqp.createConnection({ 
          host: host, 
          login: adminuser, 
          password: adminpassword, 
          vhost: vhost 
          }); 

      global.connection.addListener('ready', 
         function() { 
          sys.p("RabbitMQ connection stablished"); 
          global.amqpReady = 1; 
         } 
      ); 


      /* 
      * Web-Socket declaration 
      */ 

      io.sockets.on('connection', function (socket) { 
       socket.on('message', function (data) { 
        sys.p(data); 
        try{ 
         var message = JSON.parse(data);     
        }catch(error){ 
         socket.emit("message", JSON.stringify({"error": "invalid_params", "code": 400})); 
         var message = {}; 
        }   
        var message = JSON.parse(data); 
        if(message.token != undefined) { 

         rest.get("http://dev.kinkajougames.com/api/push", 
           {headers: 
            { 
             "x-geochat-auth-token": message.token 
            } 
           }).on('complete', 
            function(data) { 
             a = data; 
           }).on('success', 
            function (data){ 
             sys.p(data); 
             try{         
              sys.p("---- creating exchange"); 
              socket.exchange = global.connection.exchange(data.data.bind, {type: 'fanout', durable: true, autoDelete: true}); 
              sys.p("---- declarando queue"); 
              socket.q = global.connection.queue(data.data.queue, {durable: true, autoDelete: true, exclusive: false}, 
               function(){ 
                sys.p("---- bind queue to exchange"); 
                //socket.q.bind(socket.exchange, "*"); 
                socket.q.bind(socket.exchange, "*"); 
                sys.p("---- subscribing queue exchange"); 
                socket.q.subscribe(function (message) { 
                 socket.emit("message", message.data.toString()); 
                });  
               } 
              ); 
             }catch(err){ 
              sys.p("Imposible to connection to rabbitMQ-server"); 
             }         

           }).on('error', function (data){ 
            a = { 
             data: data, 
            }; 
           }).on('400', function() { 
            socket.emit("message", JSON.stringify({"error": "connection_error", "code": 400})); 
           }).on('401', function() { 
            socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401})); 
           });    
        } 
        else { 
         socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401})); 
        } 

       }); 
       socket.on('disconnect', function() { 
        socket.q.destroy(); 
        sys.p("closing socket"); 
       }); 
      }); 

客戶端:

  • 插座intance與選項的力量新連接'= true並且'在卸載時同步斷開連接'=假。
  • 客戶端使用onbeforeunload和onunload窗口對象事件發送socket.disconnect
  • socket.connect事件上的客戶端發送用戶令牌給節點。從插座

     var socket; 
         function webSocket(){ 
          //var socket = new io.Socket(); 
          socket = io.connect("ws.dev.kinkajougames.com", {'force new connection':true, 'sync disconnect on unload': false}); 
          //socket.connect(); 
    
          onSocketConnect = function(){ 
           alert('Connected'); 
           socket.send(JSON.stringify({ 
            token: Get_Cookie('liveScoopToken') 
           })); 
          }; 
    
          socket.on('connect', onSocketConnect); 
          socket.on('message', function(data){ 
           message = JSON.parse(data); 
           if (message.action == "chat") { 
            if (idList[message.data.sender] != undefined) { 
             chatboxManager.dispatch(message.data.sender, { 
              first_name: message.data.sender 
             }, message.data.message); 
            } 
            else { 
             var username = message.data.sender; 
             Data.Collections.Chats.add({ 
              id: username, 
              title: username, 
              user: username, 
              desc: "Chat", 
              first_name: username, 
              last_name: "" 
             }); 
             idList[message.data.sender] = message.data.sender; 
             chatboxManager.addBox(message.data.sender, { 
              title: username, 
              user: username, 
              desc: "Chat", 
              first_name: username, 
              last_name: "", 
              boxClosed: function(id){ 
               alert("closing"); 
              } 
             }); 
             chatboxManager.dispatch(message.data.sender, { 
              first_name: message.data.sender 
             }, message.data.message); 
            } 
           } 
          }); 
         }       
    
         webSocket(); 
    
         window.onbeforeunload = function() { 
          return "You have made unsaved changes. Would you still like to leave this page?"; 
         } 
    
         window.onunload = function(){ 
          socket.disconnect(); 
         } 
    

  • 理線的消息就是這樣,該消息的所以沒有更多的圓形robing。