2017-04-02 91 views
0

我在WebSocket(WS)內發出Redis訂閱。當我接收到WS打開時,我將請求線程化,然後實例化Redis客戶端。在公開之內,我爲Redis提供線程併發布訂閱。WebSocket和Redis導致從pubsub和/或brpop掛起連接

這一切正常,直到我收到一個意想不到的WS關閉。此時,運行Redis訂閱的線程消失了。如果我發出取消訂閱,我會得到一個掛起。如果我不退訂,我已經離開了一個幻影訂閱,導致我接下來發生麻煩。

發佈它的線程終止後,是否有某種方法可以刪除訂閱?我已經注意到,Redis實例對於該終止的線程有一個mon變量。示例Ruby代碼是:

class Backend 
    include MInit 

    def initialize(app) 
    setup 
    @app = app 
    end 

    def run!(env) 
    if Faye::WebSocket.websocket?(env) 
     ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME) 
     ws_thread = Thread.fork(env) do 
     credis = Redis.new(host: @redis_uri.host, port: @redis_uri.port, password: @redis_uri.password) 

     ws.on :open do |event| 
      channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length] 
      redis_thread = Thread.fork do 
      credis.subscribe(channel) do |on| 
       on.message do |message_channel, message| 
       sent = ws.send(message) 
       end 
       on.unsubscribe do |message_channel| 
       puts "Unsubscribe on channel:#{channel};" 
       end 
      end 
      end 
     end 

     ws.on :message do |event| 
      handoff(ws: ws, event: event) 
     end 

     ws.on :close do |event| 
      # Hang occurs here 
      unsubscribed = credis.unsubscribe(channel) 
     end 

     ws.on :error do |event| 
      ws.close 
     end 

     # Return async Rack response 
     ws.rack_response 

     end 
    end 
    else 
    @app.call(env) 
    end 

    private 
    def handoff(ws: nil, event: nil, source: nil, message: nil) 
    # processing 
    end 
end 

回答

1

一旦我真正理解了問題,該修復就相當簡單。 Redis線程實際上依然存在。但是,Redis仍然掛着,因爲它正在等待線程獲得控制權。要做到這一點,在WS.close代碼需要通過如下WS.close內使用EM.next_tick割讓控制:

ws.on :close do |event| 
    EM.next_tick do 
    # Hang occurs here 
    unsubscribed = credis.unsubscribe(channel) 
    end 
end 
+0

您可能會嘗試的另一種方法是讓Redis定期向主題發佈一些無害消息,強制所有訂閱都執行_write_。任何「幻像」訂閱應該隨後被清除/關閉,因爲Redis檢測到寫入死/關對方的錯誤。 – Castaglia

+0

@Castaglia有趣。就我而言,知道以前的訂閱沒有清楚地知道如何清除它,但在恢復時,我發出了另一個訂閱並繼續處理。這會將消息發佈到相同頻道,包括幻像訂閱和同名的新直播頻道。你希望能夠清除幻影嗎?我問,因爲隨着時間的推移,我遇到了這種情況下的資源限制。 –

+0

啊,我想我明白了。訂閱綁定到TCP連接,_that_ TCP連接仍然存在;它是客戶端上的_thread_,它從應用程序的其餘部分的POV中去世,從而「失去」訂閱。是?我想知道,如果從同一個TCP連接(清除任何幻像/丟失/滯留的訂閱)執行'UNSUBSCRIBE'(指定所有頻道,或者不指定所有頻道),然後重新訂閱,可能會工作。 – Castaglia

1

這更多的是一種長的評論提供了一個解決辦法,而不是一個解決方案。

如果是我的申請,我會重新考慮設計。

爲每個websocket客戶端打開一個新的Redis連接和一個新線程是一個相當大的資源承諾。

只是爲了說明,與Redis的每個連接都需要TCP/IP套接字(這是一種有限的資源)和內存。線程的保留堆棧內存應該爲每個線程花費大約2Mib ...因此,1K Redis連接和線程會在內存中產生大約2Gib的成本。

除了Redis服務器本身具有有限數量的連接,它通常可以接受這一事實之外(儘管這通常是價格而不是硬性限制的問題,因爲它們是按比例設計的) 。

重新調整設計應該非常簡單,以便單個線程和連接爲所有websocket客戶端提供服務,這也將使管理更容易。

這可以使用內部每進程廣播系統(例如由plezi.io實現)或使用Redis subscribe/punsubscribe命令執行。

+0

我首先看了plezi.io。事實上,我最近審查它,看看它是否有退訂。它根本不符合我的需求。是的,我瞭解Redis連接的負擔,因爲我監控它們。這是我專注於清理孤兒的原因之一。實際上,在這個過程中,我從PUBSUB轉換到LPUSH/BRPOP,雖然在這裏遇到同樣的問題,但它更容易管理。我的最終用例只需要我選擇的設計。但是,感謝您的建議。 –

+0

@R_G,我想有時候設計不在我們手中......我只能祝你好運! ... P.S. ......我不是故意推plezi,我很抱歉,如果它出現這種方式。 – Myst

+0

當然不是!我來這裏尋求建議,並感謝大家的努力! –