我在我建立一個應用程序使用elixir phoenix websocket和我有一個看起來像這樣的史詩:rxjs/Redux的觀察者:observable.retry重新建立連接
const socketObservable = Observable.create((observer: Object) => {
const socket = new Socket(`${getWebSocketUrl()}/socket`, { params: {
token: readSession(),
} });
socket.connect();
socket.onOpen(() =>
observer.next({ type: SOCKET_CONNECTED, socket }),
);
socket.onError((error) =>
observer.error({ type: WEBSOCKET_ERROR, error }),
);
return() => {
// socket.disconnect();
};
});
const connectToSocket = (
action$: Object,
) => action$.ofType(CONNECT_TO_SOCKET)
.switchMap(() =>
socketObservable
.catch((error) => Observable.of(error)),
)
.retry();
export default connectToSocket;
我想當網絡連接通過發出{ type: WEBSOCKET_ERROR, error }
而消失時通知用戶,並在通過發出{ type: SOCKET_CONNECTED, socket }
重新建立連接時刪除通知。那麼我得到了第一部分工作,但是當重新連接發生時,{ type: SOCKET_CONNECTED, socket }
從不發送。使用終極版,傳奇,我可以使用下面的代碼,使這項工作:
const connectToSocket =(): Object =>
eventChannel((emitter: (Object) => mixed) => {
const socket = new Socket(`${getWebSocketUrl()}/socket`, { params: {
token: readSession(),
} });
socket.connect();
socket.onOpen(() => emitter({ socket }));
socket.onError((error) => {
emitter({ error });
});
return() => {
// socket.disconnect();
};
});
export function* callConnectToSocket(): Generator<IOEffect, *, *> {
const chan = yield call(connectToSocket);
while (true) {
const { socket, error } = yield take(chan);
if (socket) {
yield put({ type: SOCKET_CONNECTED, socket });
} else {
yield put({ error, type: WEBSOCKET_ERROR });
}
}
}
export function* watchConnectToSocket(): Generator<IOEffect, *, *> {
yield takeLatest(CONNECT_TO_SOCKET, callConnectToSocket);
}
對於rxjs代碼,我以爲在鏈的末端套結.retry()
應該觸發我的源代碼的重試觀察到,如果en錯誤按照documentation for rxjs Observable.retry發出,但可能我並不真正瞭解retry
應該做什麼或者如何正確使用它。可能有人可以幫助實現我想要的。
謝謝@Sergey Karavaev。我已經嘗試了您的建議,但沒有奏效。 – samba6
只是想澄清一下:你想讓'retry'重新建立套接字連接還是從錯誤中恢復?在第一種情況下,你可能試着將'retry'調用移動到內部observable上,緊接着'catch'後面。 –
我還有一個想法:你是否期望你的'socketObservable'在產生'WEBSOCKET_ERROR'之後產生'SOCKET_CONNECTED'通知?即即使在同一套接字實例上調用了'onError'之後,onOpen'回調函數可能會被調用嗎?如果是這樣,那麼這是不可能的,因爲'Rx'不允許這樣做。 'Rx'遵循語法:'next *(complete | error)?',這意味着在同一個流內的'error'之後不會有其他通知。因此,無論您是僅使用「next」通知(如最終所做的那樣),還是在收到「error」後重新創建'socketObservable'。 –