2017-07-28 93 views
0

我在我建立一個應用程序使用elixir phoenix websocket和我有一個看起來像這樣的史詩:rxjs/Redux的觀察者:observable.retry重新建立連接

const socketObservable = Observable.create((observer: Object) => { 
    const socket = new Socket(`${getWebSocketUrl()}/socket`, { params: { 
    token: readSession(), 
    } }); 

    socket.connect(); 

    socket.onOpen(() => 
     observer.next({ type: SOCKET_CONNECTED, socket }), 
); 

    socket.onError((error) => 
     observer.error({ type: WEBSOCKET_ERROR, error }), 
); 

    return() => { 
    // socket.disconnect(); 
    }; 
}); 

const connectToSocket = (
    action$: Object, 
) => action$.ofType(CONNECT_TO_SOCKET) 
.switchMap(() => 
    socketObservable 
    .catch((error) => Observable.of(error)), 
) 
.retry(); 

export default connectToSocket; 

我想當網絡連接通過發出{ type: WEBSOCKET_ERROR, error }而消失時通知用戶,並在通過發出{ type: SOCKET_CONNECTED, socket }重新建立連接時刪除通知。那麼我得到了第一部分工作,但是當重新連接發生時,{ type: SOCKET_CONNECTED, socket }從不發送。使用終極版,傳奇,我可以使用下面的代碼,使這項工作:

const connectToSocket =(): Object => 
    eventChannel((emitter: (Object) => mixed) => { 
    const socket = new Socket(`${getWebSocketUrl()}/socket`, { params: { 
     token: readSession(), 
    } }); 

    socket.connect(); 

    socket.onOpen(() => emitter({ socket })); 

    socket.onError((error) => { 
     emitter({ error }); 
    }); 

    return() => { 
     // socket.disconnect(); 
    }; 
    }); 

export function* callConnectToSocket(): Generator<IOEffect, *, *> { 
    const chan = yield call(connectToSocket); 
    while (true) { 
    const { socket, error } = yield take(chan); 
    if (socket) { 
     yield put({ type: SOCKET_CONNECTED, socket }); 
    } else { 
     yield put({ error, type: WEBSOCKET_ERROR }); 
    } 
    } 
} 

export function* watchConnectToSocket(): Generator<IOEffect, *, *> { 
    yield takeLatest(CONNECT_TO_SOCKET, callConnectToSocket); 
} 

對於rxjs代碼,我以爲在鏈的末端套結.retry()應該觸發我的源代碼的重試觀察到,如果en錯誤按照documentation for rxjs Observable.retry發出,但可能我並不真正瞭解retry應該做什麼或者如何正確使用它。可能有人可以幫助實現我想要的。

回答

0

對於retry算子生效,其源觀察值必須產生一個錯誤。而且在您的示例中,似乎錯誤通知從未達到retry,因爲它被從錯誤中恢復的catch運算符吞噬。

要使其工作,你可以嘗試讓catch運營商返回一個可觀察的,首先發出一個動作,然後產生一個錯誤:

const connectToSocket = action$ => 
    actions$.ofType(CONNECT_TO_SOCKET) 
     .switchMap(() => socketObservable 
      .catch(error => Observable.of(error).concat(Observable.throw(error))) 
     ) 
     .retry(); 

更新:

我認爲這是值得一提的是,Rx遵循語法next* (complete|error)?,這意味着next()呼叫在同一觀察者上的error()之後將不起作用。因此,如果socket從錯誤中恢復並且在執行onError後執行onOpen回調,SOCKET_CONNECTED通知將不會到達使用者。

這可以通過兩種替代errornext通知或重新發生錯誤socketObservable每一次,這意味着一個新的socket實例將被創建來處理可能(但是這可能不是你想要的)。

下面是一個可運行的代碼示例演示如何retry可能的工作:

const { createStore, applyMiddleware } = Redux; 
 
const { createEpicMiddleware } = ReduxObservable; 
 

 
const socketObservable = Rx.Observable.create(observer => { 
 
    const t1 = setTimeout(() => observer.next({ type: "SOCKET_CONNECTED" }), 200); 
 
    const t2 = setTimeout(() => observer.error({ type: "SOCKET_ERROR" }), 400); 
 

 
    return() => { 
 
     clearTimeout(t1); 
 
     clearTimeout(t2); 
 
    }; 
 
}) 
 

 
const connectToSocket = action$ => action$ 
 
    .do(action => console.log(action)) 
 
    .ofType("CONNECT_TO_SOCKET") 
 
    .switchMap(() => socketObservable 
 
     .catch(error => Rx.Observable.of(error).concat(Rx.Observable.throw(error))) 
 
     // make 2 attempts to re-connect, i.e. restart socketObservable 
 
     .retry(2) 
 
    ) 
 
    // recover in case if both attempts to reconnect have failed 
 
    .retry(); 
 

 
const store = createStore(
 
    (state, action) => state, 
 
    applyMiddleware(createEpicMiddleware(connectToSocket))); 
 

 
// dispatch CONNECT_TO_SOCKET two times 
 
Rx.Observable.interval(2000) 
 
    .take(2) 
 
    .subscribe(x => store.dispatch({ type: "CONNECT_TO_SOCKET" }));
<script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script> 
 
<script src="https://unpkg.com/[email protected]/dist/redux.min.js"></script> 
 
<script src="https://unpkg.com/[email protected]/dist/redux-observable.min.js"></script>

+0

謝謝@Sergey Karavaev。我已經嘗試了您的建議,但沒有奏效。 – samba6

+0

只是想澄清一下:你想讓'retry'重新建立套接字連接還是從錯誤中恢復?在第一種情況下,你可能試着將'retry'調用移動到內部observable上,緊接着'catch'後面。 –

+0

我還有一個想法:你是否期望你的'socketObservable'在產生'WEBSOCKET_ERROR'之後產生'SOCKET_CONNECTED'通知?即即使在同一套接字實例上調用了'onError'之後,onOpen'回調函數可能會被調用嗎?如果是這樣,那麼這是不可能的,因爲'Rx'不允許這樣做。 'Rx'遵循語法:'next *(complete | error)?',這意味着在同一個流內的'error'之後不會有其他通知。因此,無論您是僅使用「next」通知(如最終所做的那樣),還是在收到「error」後重新創建'socketObservable'。 –

0

那麼我終於放棄了在連接丟失時拋出錯誤並將此行更改爲observer.error({ type: WEBSOCKET_ERROR, error })observer.next({ type: WEBSOCKET_ERROR, error })。但我仍然想知道我在做什麼錯誤retry。任何幫助原代碼將不勝感激。