2016-11-27 60 views
2

我有一個List<Task>,其中Task是一個接口,其中一個方法返回Map<String, JsonElement>。我怎樣才能並行執行List<Task>,並返回一個新的HashMap,每個組合的結果爲Task使用RxJava將並行任務列表的結果組合到單個HashMap中

目前,我有這樣的:

List<Task> tasks = getTasks(); 

Observable.from(tasks) 
    .flatMap(new Func1<Task, Observable<Map<String, JsonElement>>>() { 
     @Override 
     public Observable<Map<String, JsonElement>> call(Task task) { 
      return Observable.just(task.get()); 
     } 
    }); 

// group into single Map<String,JsonElement> 
// create Observable<Map<String,JsonElement>> with all results 
+0

通過什麼值你想要在新的'HashMap'中組合/分組你的'Task'? –

+0

每個Task.get()的結果都應該使用''map.putAll()'''在新的'''HashMap''中進行合併。 –

回答

3

使用defer封裝Map每股認購和Scheduler基於大小的線程池,你想:

Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(5)); 
Observable.defer(() -> { 
    final Map<String, JsonElement> map = new ConcurrentHashMap<>(); 
    return Observable 
    .from(tasks) 
    .flatMap(task -> 
     Observable 
     .fromCallable(task -> task.get()) 
     .doOnNext(mp -> map.putAll(mp)) 
     .subscribeOn(scheduler)) 
    .ignoreElements() 
    .concatWith(Observable.just(map)); 
}); 

注意,選擇的Scheduler將取決於正在執行的任務的性質。如果CPU占主導地位,您可能會對Schedulers.computation()感到滿意。

相關問題