2017-07-04 101 views
0
var offset = 1; 
var limit = 500; 

var list = new Promise(function (resolve, reject) { 
    rets.getAutoLogoutClient(config.clientSettings, (client) => { 
    var results = client.search.query(SearchType, Class, Query, { 
     limit: limit, 
     offset: offset 
    }); 
    resolve(results); 
    }); 
}); 

var source = Rx.Observable.fromPromise(list); 

source.subscribe(results => console.log(results.count)); 

我正在做一個房地產網站,使用RETS。RXJS擴展運營商

我試圖做我的查詢是有限的從RETS服務器,運行此循環增加我的偏移量,直到我有我所有的數據。我不知道計數是什麼,直到我運行查詢並找到計數值。

我試圖使用擴展,但我不知道它是如何工作的。試圖做到這些多種方式,即使使用舊的時尚while循環,而無法與.then方法一起使用。所以我已經轉向RXJS,因爲我一直在Angular 4中使用它。

這是在express中完成的。我需要最終運行玉米作業來獲取更新的屬性,但是如果計數高於我的偏移量,我的問題是每次都提取所有數據並增加偏移量。因此,例如,運行帶有偏移的1查詢與500總的限制這裏是1690年在我的偏移所以,下次去將是:

offset += limit 

一旦我有我的數據,我需要保存它到MongoDB。我已經能夠成功做到這一點。它只是找到一種方法來獲取所有數據,而無需手動設置我的偏移量。

注意服務器的限制是2500,是的,我可以獲取這一切在一杆,但也有其他數據,如媒體,這可能大大超過2500

有什麼建議?

回答

0

對於RxJS,這實際上是一個相當常見的用例,因爲有很多分頁數據源,或者在一次可以請求的內容中有其他限制的來源。

我的兩分錢

在我看來expand可能是這個因爲你對分頁未知的數據源,您需要,以確定最終計至少一個查詢的最佳操作。如果你知道你將要查詢的數據量是多少,那麼使用類似mergeScan的東西,但我會離題。

建議的解決方案

這可能需要一點點努力繞到你的頭,所以我已經添加註解儘可能打破這一切是如何工作。 注意我還沒有真正測試過,所以請原諒我的任何語法錯誤。

// Your constant limit for any one query 
const limit = 500; 

// RxJS helper method that wraps the async call into an Observable 
// I am basing this on what I saw of your sample which leads me to believe 
// that this should work. 
const clientish = Rx.Observable.bindCallback(rets.getAutoLogoutClient); 

// A method wrapper around your query call that wraps the resulting promise 
// into a defer. 
const queryish = (client, params) => 
    // Note the use of defer here is deliberate, since the query returns 
    // a promise that will begin executing immediately, this prevents that behavior 
    // And forces execution on subscription. 
    Rx.Observable.defer(() => client.search.query(SearchType, Class, Query, params)); 

// This does the actual expansion function 
// Note this is a higher order function because the client and the parameters 
// are available at different times 
const expander = (client) => ({limit, count}) => 
    // Invoke the query method 
    queryish(client, {limit, count}) 
    // Remap the results, update offset and count and forward the whole 
    // package down stream 
    .map(results => ({ 
     limit, 
     count: results.count, 
     offset: offset + limit, 
     results 
    })); 


// Start the stream by constructing the client 
clientish(config.clientSettings) 
    .switchMap(client => 
    // This are the arguments for the initial call 
    Rx.Observable.of({limit, offset: 0}) 
     // Call the expander function with the client 
     // The second argument is the max concurrency, you can change that if needed 
     .expand(expander(client), 1) 

     // Expand will keep recursing unless you tell it to stop 
     // This will halt the execution once offset exceeds count, i.e. you have 
     // all the data 
     .takeWhile(({count, offset}) => offset < count) 

     // Further downstream you only care about the results 
     // So extract them from the message body and only forward them 
     .pluck('results') 
) 
    .subscribe(results => /*Do stuff with results*/); 
+0

這不起作用。 switchMap不是一個函數。 takeWhile有語法錯誤,rets客戶端需要更好地組合。 –

+0

@JoshuaScott你正在使用哪個版本的RxJS? – paulpdaniels

+0

我正在使用RXJS 5 –

0
const retsConnect = Rx.Observable.create(function(observer) { 
    rets.getAutoLogoutClient(config.clientSettings, client => { 
    return searchQuery(client, 500, 1, observer); 
    }); 
}); 

function searchQuery(client, limit, offset, observer) { 
    let currentOffset = offset === undefined || offset === 0 ? 1 : offset; 
    return client.search.query(SearchType, Class, Query, {limit: limit, offset: currentOffset}) 
    .then(results => { 
     offset += limit; 
     observer.next(results.maxRowsExceeded); 
     if (results.maxRowsExceeded) { 
     console.log(offset); 
     return searchQuery(client, limit, offset, observer); 
     } else { 
     console.log('Completed'); 
     observer.complete(); 
     } 
    }); 
} 

retsConnect.subscribe(val => console.log(val)); 

所以這是我在這裏試圖找到感覺了。我仍在調整這個過程。所以我期望做的是更多地分解searchQuery。不知道我是否應該將observer.next傳遞給那裏,所以我打算找出映射的位置並重新安裝返回的searchQuery。我不確定takeUntil會採取真實還是錯誤。我需要這些數據做的事情就是保存到mongodb中。所以我想我可以像這樣離開它,並將我的保存方法放在那裏,但我仍然想要弄清楚這一點。

注意:results.maxRowsExceeded在還有更多數據時返回true。所以一旦maxRows返回false,它將停止並且所有的數據都被獲取。