2017-02-21 40 views
-1

我一直堅持一個rxjs問題幾天了。我在REACT中使用rxjs,並嘗試獲取屬性更改流以運行異步函數以快速前進/後退播放器。然後進一步等待任何函數調用,直到回調返回。如果在等待回調期間流中發生任何屬性更改,只需執行最後一個屬性。然後重複這個過程。如何連續執行異步功能並非只有最新的流

下面是一個示例代碼如下。 http://jsbin.com/jagoworawu/edit?js,console

const setCurrentTimeStream = Observable.bindNodeCallback(player.setCurrentTime); 

// Instant execute setCurrentTimeStream on first iteration. 
// Prevent next setCurrentTimeStream if not callback of previous setCurrentTimeStream was called. 
// Skip all except last while waiting for setCurrentTimeStream callback and then execute it. 
// Then repeat the process. 
const lifecycle$ = props$ 
    .distinctUntilKeyChanged('currentTime') 
    // tried with audit and throttle but it won't execute the last one 
    .audit(({ currentTime }) => setCurrentTimeStream(currentTime)); 
    // Probably needs something more here? 

編輯: 閱讀一些資料後,我想出了這個。請提供反饋或其他解決方案,最好使用rxjs的本地方法。

const latestExecAsync = (input, action) => Observable.create((observer) => { 
    let queued = false; 
    let latestValue; 
    const dequeue = (reference) => { 
    queued = true; 
    action(reference.value, (error, value) => { 
     if (error) return output.error(error); 
     observer.next(reference.value); 
     if (reference !== latestValue) { 
     dequeue(latestValue); 
     } else { 
     queued = false; 
     } 
    }); 
    }; 
    const subscription = input.subscribe({ 
    next: (value) => { 
     const reference = latestValue = { value }; 
     if (!queued) dequeue(reference); 
    }, 
    error: e => observer.error(e), 
    complete:() => observer.complete(), 
    }); 
    return() => subscription.unsubscribe(); 
}); 

// Usage: 
latestExecAync(
    props$.distinctUntilKeyChanged('currentTime'), 
    ({ currentTime }, cb) => player.setCurrentTime(currentTime, cb) 
) 

回答

0

我終於想出了我一直在尋找的行爲。僅使用rxjs本機方法。 :)

const currentTime$ = props$.pluck('currentTime'); 
const $trailingCurrentTime = new Subject(); 

const stream$ = Observable 
    .merge(currentTime$, $trailingCurrentTime) 
    .audit(currentTime => setCurrentTime$(currentTime)) 
    .filter(currentTime => { 
    if (currentTime !== player.currentTime) { 
     $trailingCurrentTime.next(currentTime); 
    } 
    return currentTime === player.currentTime; 
    }); 

http://jsbin.com/wedefojuyo/edit?js,console,output