2017-08-15 65 views
0

並行請求有序流我正在尋找一種方式來使用流量來實現以下目標:在助焊劑

  1. 處理100萬(或更多)的請求流。
  2. 請求編號(從1到1,000,000)。
  3. 應該使用10個線程並行啓動請求。
  4. 啓動請求的順序取決於它們的序列號。
  5. 結果助焊劑應按照與請求相同的順序返回響應。
  6. 結果助焊劑應發出每個響應AS ASON AS它的所有前輩都可用。

我知道#4的答案是對調度程序使用單個執行程序。但是,我不知道如何實現#6。

下面是一個示例場景:1,2,3,4,5,6,7,8,9,10都推出

  • 請求。
  • 響應2到達(請求11啓動,因爲有線程可用)。
  • 響應5到達(請求12啓動)。
  • 響應4到達(請求13啓動)。
  • 響應1到達(請求14啓動)。
  • 響應1和2被髮射並開始處理它們。

  • 響應3到達(請求15啓動)。
  • 響應3,4,5被髮射並開始處理。

所以 - 我應該如何修改下面的代碼,以達到#6?

public class Example { 
 

 
private final Scheduler scheduler = Schedulers.fromExecutor(
 
\t Executors.newFixedThreadPool(10)); 
 

 
public void start() { 
 
\t Flux<Request> requestFlux = getFluxOfOneMillionRequests(); // Never mind how this is achieved 
 
\t 
 
\t Flux<Response> responseFlux = flux.flatMap(request -> doInWorkerThread(request)); 
 
\t 
 
\t flux.doOnNext(response -> processResponse(response)).subscribe() 
 
} 
 

 
private Mono<Response> doInWorkerThread(Request request) { 
 

 
\t return Mono.fromCallable(() -> { 
 

 
\t \t // Do something 
 
\t \t return new Response(request.getSerial(), someResult); 
 
\t }).subscribeOn(scheduler); 
 
} 
 

 
private void processResponse(Response response) { 
 
\t // Do something 
 
} 
 
}

回答

0

我想通了一定的答案,儘管我使出使用低級別的線程同步的代碼。 的想法是有3個助焊劑步驟:

  • 首先步驟處理請求在多個線程中,通過在調度器,其使用具有多個工作線程的執行器訂閱一個單聲道。當工作線程完成時,它將其響應放在一個公共列表中,並返回一些任意對象以激活第二步。
  • 第二步將工作線程的「信號」發送到單個收集線程。它通過訂購另一個上的Mono調度程序來實現,該調度程序使用單線程Executor。
  • 第三步在單個線程上運行。每次調用它時,都會從公共列表中提取按順序排列的響應,並將它們返回爲Flux。

對於多線程調度程序,我使用單個執行程序,以便按照它們插入的順序執行任務。這會降低緩存機制必須持有的平均響應數量。

這表現在下面的代碼片斷:

import com.google.common.collect.Lists; 
 
import com.google.common.util.concurrent.ThreadFactoryBuilder; 
 
import reactor.core.publisher.Flux; 
 
import reactor.core.publisher.Mono; 
 
import reactor.core.scheduler.Scheduler; 
 
import reactor.core.scheduler.Schedulers; 
 

 
import java.util.Collections; 
 
import java.util.List; 
 
import java.util.Random; 
 
import java.util.concurrent.Executors; 
 

 
public class StreamSort { 
 

 
    private final Scheduler produceScheduler = Schedulers.fromExecutor(Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setNameFormat("--> Producer-%d").build())); 
 
    private final Scheduler consumeScheduler = Schedulers.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("<-- Consumer-%d").build())); 
 
    private final ResponseReadySignal responseReadySignal = new ResponseReadySignal(); 
 
    private int nextExpected = 1; 
 
    private List<Response> arrivals = Lists.newArrayList(); 
 

 
    private static class ResponseReadySignal {} 
 
    private Random rand = new Random(); 
 

 
    public static void main(String[] args) throws InterruptedException { 
 

 
     StreamSort m = new StreamSort(); 
 

 
     m.start(); 
 
    } 
 

 
    private void start() throws InterruptedException { 
 

 
     Flux<Request> requests = generateRequestFlux(); 
 

 
     requests 
 
       .flatMap(request -> doInWorkerThread(request)) 
 
       .flatMap(signal -> jumpToCollectingThread(signal)) 
 
       .flatMap(signal -> departInOrder(signal)) 
 
       .doOnNext(response -> log("Got " + response)) 
 
       .doOnComplete(() -> stopWaiting()) 
 
       .subscribe(); 
 

 
     waitUntilFinished(); 
 

 
     log("Disposing schedulers"); 
 
     produceScheduler.dispose(); 
 
     consumeScheduler.dispose(); 
 
     log("FINISH"); 
 
    } 
 

 
    private synchronized void waitUntilFinished() throws InterruptedException { 
 
     this.wait(); 
 
    } 
 

 
    private synchronized void stopWaiting() { 
 
     this.notifyAll(); 
 
    } 
 

 
    private Flux<Request> generateRequestFlux() { 
 
     List<Integer> serials = Lists.newArrayList(1,2,3,4,5,6,7,8,9,10); 
 

 
     return Flux.fromIterable(serials).map(serial -> new Request(serial)); 
 
    } 
 

 
    private Mono<ResponseReadySignal> doInWorkerThread(Request request) { 
 

 
     return Mono.fromCallable(() -> { 
 

 
      Response response = handleRequest(request); 
 

 
      synchronized (this) { 
 
       arrivals.add(response); 
 
      } 
 

 
      return responseReadySignal; 
 
     }).subscribeOn(produceScheduler); 
 
    } 
 

 
    private Response handleRequest(Request request) throws InterruptedException { 
 
     int milli = rand.nextInt(1000); 
 

 
     log(request + " start, sleeping " + milli); 
 

 
     Thread.currentThread().sleep(milli); // Simulate some task that takes time 
 

 
     log(request + " Finished"); 
 

 
     return new Response(request.getSerial()); // Simulate response creation. 
 

 
    } 
 

 
    private Mono<ResponseReadySignal> jumpToCollectingThread(ResponseReadySignal signal) { 
 

 
     log("Delivering signal to collecting thread"); 
 

 
     return Mono.fromCallable(() -> signal).subscribeOn(consumeScheduler); 
 
    } 
 

 
    private Flux<Response> departInOrder(ResponseReadySignal signal) { 
 

 
     List<Response> readyToDepart = Lists.newLinkedList(); 
 

 
     synchronized (this) { 
 
      Collections.sort(arrivals); 
 
      while (arrivals.size() > 0 && arrivals.get(0).getSerial() == nextExpected) { 
 
       readyToDepart.add(arrivals.remove(0)); 
 
       ++nextExpected; 
 
      } 
 
     } 
 

 
     log("Departing " + readyToDepart.size() + " items: " + readyToDepart); 
 
     return Flux.fromIterable(readyToDepart); 
 
    } 
 

 
    private static void log(String message) { 
 
     Thread t = Thread.currentThread(); 
 
     System.out.println(t.getName() + ": " + message); 
 

 
    } 
 

 
    private static class WithSerial implements Comparable<WithSerial> { 
 
     private final int serial; 
 

 
     public WithSerial(int serial) { 
 
      this.serial = serial; 
 
     } 
 

 
     public int getSerial() { 
 
      return serial; 
 
     } 
 

 
     @Override 
 
     public int compareTo(WithSerial o) { 
 
      return this.serial - o.serial; 
 
     } 
 

 
     @Override 
 
     public String toString() { 
 
      return "" + getSerial(); 
 
     } 
 
    } 
 

 
    private static class Request extends WithSerial { 
 

 
     public Request(int serial) { 
 
      super(serial); 
 
     } 
 
    } 
 

 
    private static class Response extends WithSerial { 
 
     public Response(int serial) { 
 
      super(serial); 
 
     } 
 
    } 
 
}