我建議考慮看看Akka。他們提供了一個更適合這種用例的Actor框架。由於沒有定義自己的ExecutorService接口實現,因此JDK提供的默認實現並不能對調度進行太多控制。
創建ExecutionServices的硬編碼陣列不會很動態或穩健特別是將有每一個ExecutionService線程池。我們可以用一個哈希映射替換這個數組,然後將它放在ExecutionService的自定義實現後面,這樣可以從調用者隱藏這些細節,但不能解決擁有這麼多線程池的線程浪費問題。
在阿卡,每個演員具有與之相關聯自己的消息隊列。每個Actor都有效地在自己的線程中運行,從隊列中一次處理一條消息。 Akka將管理跨多個Actor的線程共享。因此,如果要爲每個消息類型創建一個Actor,然後使用這些Actor將消息排隊,那麼您將獲得每個消息類型一次最多處理一個線程的目標,同時僅由一個池支持線程。
演示技術的:
在阿卡Maven的依賴性。
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.17</version>
</dependency>
Java 8代碼。複製並粘貼到Java文件中,然後在IDE中運行主要方法。
package com.softwaremosaic.demos.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class ActorDemo {
public static void main(String[] args) throws InterruptedException {
// The following partitioner will spread the requests over
// multiple actors, which I chose to demonstrate the technique.
// You will need to change it to one that better maps the the
// jobs to your use case. Remember that jobs that get mapped
// to the same key, will get executed in serial (probably
// but not necessarily) by the same thread.
ExecutorService exectorService = new ActorExecutionService(job -> job.hashCode()+"");
for (int i=0; i<100; i++) {
int id = i;
exectorService.submit(() -> System.out.println("JOB " + id));
}
exectorService.shutdown();
exectorService.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("DONE");
}
}
class ActorExecutionService extends AbstractExecutorService {
private final ActorSystem actorSystem;
private final Function<Runnable, String> partitioner;
private final ConcurrentHashMap<String,ActorRef> actors = new ConcurrentHashMap<>();
public ActorExecutionService(Function<Runnable,String> partitioner) {
this.actorSystem = ActorSystem.create("demo");
this.partitioner = partitioner;
}
public void execute(Runnable command) {
String partitionKey = partitioner.apply(command);
ActorRef actorRef = actors.computeIfAbsent(partitionKey, this::createNewActor);
actorRef.tell(command, actorRef);
}
private ActorRef createNewActor(String partitionKey) {
return actorSystem.actorOf(Props.create(ExecutionServiceActor.class), partitionKey);
}
public void shutdown() {
actorSystem.terminate();
}
public List<Runnable> shutdownNow() {
actorSystem.terminate();
try {
awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return Collections.emptyList();
}
public boolean isShutdown() {
return actorSystem.isTerminated();
}
public boolean isTerminated() {
return actorSystem.isTerminated();
}
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
actorSystem.awaitTermination();
return actorSystem.isTerminated();
}
}
class ExecutionServiceActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof Runnable) {
((Runnable) message).run();
} else {
unhandled(message);
}
}
}
NB上面的代碼將在未定義的順序打印1-100。由於配料(Akka爲了獲得額外的性能優勢),訂單看起來大多是連續的。然而,隨着不同線程穿插工作,您會看到一些隨機數。每個作業需要運行的時間越長,分配給阿卡線程池的多個線程,使用越多分區鍵和更底層的CPU內核,更多的隨機序列很可能成爲。
好,ExecutorService的還提供了ScheduledExecutorService的在那裏你可以scheduleAtFixedRate()或scheduleAtFixedDelay()之間進行選擇。我建議你將消息推送到10個不同的ArrayDeques中,每個ArrayDeques都用於特定的消息類型,在隊列模式(FIFO)中操作。每個ScheduledExecutorService使用給定的間隔或延遲操作它自己的消息隊列,直到您停止系統。這應該可以解決你的問題,我想。 – DiabolicWords
[Java:另一個多線程Executor中的單線程子執行程序]的可能重複(http://stackoverflow.com/questions/43284210/java-single-threaded-sub-executor-within-another-multi-threaded -executor) –