我在WebSocket(WS)內發出Redis訂閱。當我接收到WS打開時,我將請求線程化,然後實例化Redis客戶端。在公開之內,我爲Redis提供線程併發布訂閱。WebSocket和Redis導致從pubsub和/或brpop掛起連接
這一切正常,直到我收到一個意想不到的WS關閉。此時,運行Redis訂閱的線程消失了。如果我發出取消訂閱,我會得到一個掛起。如果我不退訂,我已經離開了一個幻影訂閱,導致我接下來發生麻煩。
發佈它的線程終止後,是否有某種方法可以刪除訂閱?我已經注意到,Redis實例對於該終止的線程有一個mon變量。示例Ruby代碼是:
class Backend
include MInit
def initialize(app)
setup
@app = app
end
def run!(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME)
ws_thread = Thread.fork(env) do
credis = Redis.new(host: @redis_uri.host, port: @redis_uri.port, password: @redis_uri.password)
ws.on :open do |event|
channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
redis_thread = Thread.fork do
credis.subscribe(channel) do |on|
on.message do |message_channel, message|
sent = ws.send(message)
end
on.unsubscribe do |message_channel|
puts "Unsubscribe on channel:#{channel};"
end
end
end
end
ws.on :message do |event|
handoff(ws: ws, event: event)
end
ws.on :close do |event|
# Hang occurs here
unsubscribed = credis.unsubscribe(channel)
end
ws.on :error do |event|
ws.close
end
# Return async Rack response
ws.rack_response
end
end
else
@app.call(env)
end
private
def handoff(ws: nil, event: nil, source: nil, message: nil)
# processing
end
end
您可能會嘗試的另一種方法是讓Redis定期向主題發佈一些無害消息,強制所有訂閱都執行_write_。任何「幻像」訂閱應該隨後被清除/關閉,因爲Redis檢測到寫入死/關對方的錯誤。 – Castaglia
@Castaglia有趣。就我而言,知道以前的訂閱沒有清楚地知道如何清除它,但在恢復時,我發出了另一個訂閱並繼續處理。這會將消息發佈到相同頻道,包括幻像訂閱和同名的新直播頻道。你希望能夠清除幻影嗎?我問,因爲隨着時間的推移,我遇到了這種情況下的資源限制。 –
啊,我想我明白了。訂閱綁定到TCP連接,_that_ TCP連接仍然存在;它是客戶端上的_thread_,它從應用程序的其餘部分的POV中去世,從而「失去」訂閱。是?我想知道,如果從同一個TCP連接(清除任何幻像/丟失/滯留的訂閱)執行'UNSUBSCRIBE'(指定所有頻道,或者不指定所有頻道),然後重新訂閱,可能會工作。 – Castaglia