2015-11-03 80 views
0

對於Rx專家,我有一個有趣的問題。我有一個關係表保存有關事件的信息。一個事件由id,發生的類型和時間組成。在我的代碼中,我需要在某個可能很寬的時間範圍內獲取所有事件。使用Rx批處理大型結果集

SELECT * FROM events WHERE event.time > :before AND event.time < :after ORDER  BY time LIMIT :batch_size 

爲了提高可靠性並處理大型結果集,我查詢批量大小的記錄:batch_size。現在,我想寫一個函數,給出:before和:after,將返回一個代表結果集的Observable。

Observable<Event> getEvents(long before, long after); 

在內部,函數應該批量查詢數據庫。沿時間尺度的事件分佈是未知的。因此,解決批處理的自然方法是: 如果結果不爲空,則獲取前N條記錄 ,將最後一條記錄的時間用作新的'before'參數,並提取下N條記錄;否則在 終止 如果結果不是空的,使用最後一個記錄的時間作爲一個新的'before'參數,並獲取下N個記錄;否則終止 ......等等(這個想法應該很清楚)

我的問題是:

是否有表達更高級別的可觀測元(過濾器/圖而言,這功能的方法/ flatMap /掃描/範圍等),而不明確使用訂閱者?

到目前爲止,我沒能做到這一點,並提出了以下簡單的代碼來代替:

private void observeGetRecords(long before, long after, Subscriber<? super Event> subscriber) { 
    long start = before; 
    while (start < after) { 
     final List<Event> records; 
     try { 
      records = getRecordsByRange(start, after); 
     } catch (Exception e) { 
      subscriber.onError(e); 
      return; 
     } 
     if (records.isEmpty()) break; 
     records.forEach(subscriber::onNext); 
     start = Iterables.getLast(records).getTime(); 
    } 

    subscriber.onCompleted(); 
} 

public Observable<Event> getRecords(final long before, final long after) { 
     return Observable.create(subscriber -> observeGetRecords(before, after, subscriber)); 
} 

這裏,getRecordsByRange實現使用DBI SELECT查詢並返回一個列表。此代碼工作正常,但缺乏高級Rx構造的優雅。

注意:我知道我可以通過DBI中的SELECT查詢返回Iterator。但是,我不想那樣做,而寧願運行多個查詢。這種計算不一定是原子的,所以事務隔離的問題是不相關的。

+0

我不完全理解爲什麼你會想從時間繼續最後一批。給定tstart,傾向於,如果此範圍內的數據以tdata結尾,爲什麼要從tdata查詢?目前tdata和趨勢之間沒有任何數據可用。 – akarnokd

+0

所以這就是爲什麼我檢查'records'列表是否爲空,並且如果是的話就從循環中斷開。 – GrumpyCat

回答

0

雖然我不完全理解爲什麼你要這樣的時間重用,這裏就是我會做它:

BehaviorSubject<Long> start = BehaviorSubject.create(0L); 

start 
.subscribeOn(Schedulers.trampoline()) 
.flatMap(tstart -> 
    getEvents(tstart, tstart + twindow) 
    .publish(o -> 
     o.takeLast(1) 
     .doOnNext(r -> start.onNext(r.time)) 
     .ignoreElements() 
     .mergeWith(o) 
    ) 
) 
.subscribe(...) 
+0

謝謝,我應該看看主題,以前沒有用過。 – GrumpyCat

+0

順便說一句,你的意思是時間重用?我只想以對getRecords函數的客戶端不透明的方式對SQL查詢進行批處理。 – GrumpyCat

+0

我的意思是你使用查詢的輸出來參數化下一個查詢。 – akarnokd