-1
我一直堅持一個rxjs問題幾天了。我在REACT中使用rxjs,並嘗試獲取屬性更改流以運行異步函數以快速前進/後退播放器。然後進一步等待任何函數調用,直到回調返回。如果在等待回調期間流中發生任何屬性更改,只需執行最後一個屬性。然後重複這個過程。如何連續執行異步功能並非只有最新的流
下面是一個示例代碼如下。 http://jsbin.com/jagoworawu/edit?js,console
const setCurrentTimeStream = Observable.bindNodeCallback(player.setCurrentTime);
// Instant execute setCurrentTimeStream on first iteration.
// Prevent next setCurrentTimeStream if not callback of previous setCurrentTimeStream was called.
// Skip all except last while waiting for setCurrentTimeStream callback and then execute it.
// Then repeat the process.
const lifecycle$ = props$
.distinctUntilKeyChanged('currentTime')
// tried with audit and throttle but it won't execute the last one
.audit(({ currentTime }) => setCurrentTimeStream(currentTime));
// Probably needs something more here?
編輯: 閱讀一些資料後,我想出了這個。請提供反饋或其他解決方案,最好使用rxjs的本地方法。
const latestExecAsync = (input, action) => Observable.create((observer) => {
let queued = false;
let latestValue;
const dequeue = (reference) => {
queued = true;
action(reference.value, (error, value) => {
if (error) return output.error(error);
observer.next(reference.value);
if (reference !== latestValue) {
dequeue(latestValue);
} else {
queued = false;
}
});
};
const subscription = input.subscribe({
next: (value) => {
const reference = latestValue = { value };
if (!queued) dequeue(reference);
},
error: e => observer.error(e),
complete:() => observer.complete(),
});
return() => subscription.unsubscribe();
});
// Usage:
latestExecAync(
props$.distinctUntilKeyChanged('currentTime'),
({ currentTime }, cb) => player.setCurrentTime(currentTime, cb)
)