2017-04-11 54 views
4

要求:的Java:執行人服務與多個隊列

  1. 我有留言分爲不同類型e.g Type1, Type2 ... Type100
  2. 我想以並行執行不同類型的消息。我們假設在10個線程中,但是所有相同類型的消息必須一個接一個地執行。執行順序無關緊要。
  3. 一旦線程完成所有的TypeX消息。它應該開始處理另一個類型。

我經歷了不同的答案: 他們中的大多數人建議執行程序服務來處理多線程。 比方說,我們創建一個像

ExecutorService executorService = Executors.newFixedThreadPool(10); 

執行服務,但是一旦我們提交使用executorService.submit(runnableMessage);

的消息時,我們沒有得到過的消息的特定類型的任務只有一個特定的線程任何控制。

解決方案:

創建單線程執行人

ExecutorService[] pools = new ExecutorService[10]; 

和最初的陣列通過類型1,類型2 ...類型10 的消息,則如果任何執行已經完成執行然後分配Type11並繼續這樣做,直到所有類型得到處理。

有沒有更好的辦法做到這一點?

類似執行器服務與多個隊列在哪裏我可以推送每種類型的消息到不同的隊列?

+0

好,ExecutorService的還提供了ScheduledExecutorService的在那裏你可以scheduleAtFixedRate()或scheduleAtFixedDelay()之間進行選擇。我建議你將消息推送到10個不同的ArrayDeques中,每個ArrayDeques都用於特定的消息類型,在隊列模式(FIFO)中操作。每個ScheduledExecutorService使用給定的間隔或延遲操作它自己的消息隊列,直到您停止系統。這應該可以解決你的問題,我想。 – DiabolicWords

+0

[Java:另一個多線程Executor中的單線程子執行程序]的可能重複(http://stackoverflow.com/questions/43284210/java-single-threaded-sub-executor-within-another-multi-threaded -executor) –

回答

1

一種更簡單的解決方案可以是:

,而不是使每個消息可運行。 我們可以根據它們的類型創建組消息:

例如我們創建組別1TYPE1

class MessageGroup implements Runnable { 
    String type; 
    String List<Message> messageList; 

    @Override 
    public void run() { 
     for(Message message : MessageList) { 
     message.process(); 
     } 
    } 
} 

的所有消息,我們可以像

ExecutorService executorService = Executors.newFixedThreadPool(10); 

與固定線程創建平時執行的服務和不提交個人信息,我們可以提交組消息如

executorService.submit(runnableGroup); 

並且每個組都會執行messa相同類型的ges按順序在同一個線程中。

5

我建議考慮看看Akka。他們提供了一個更適合這種用例的Actor框架。由於沒有定義自己的ExecutorService接口實現,因此JDK提供的默認實現並不能對調度進行太多控制。

創建ExecutionServices的硬編碼陣列不會很動態或穩健特別是將有每一個ExecutionService線程池。我們可以用一個哈希映射替換這個數組,然後將它放在ExecutionService的自定義實現後面,這樣可以從調用者隱藏這些細節,但不能解決擁有這麼多線程池的線程浪費問題。

在阿卡,每個演員具有與之相關聯自己的消息隊列。每個Actor都有效地在自己的線程中運行,從隊列中一次處理一條消息。 Akka將管理跨多個Actor的線程共享。因此,如果要爲每個消息類型創建一個Actor,然後使用這些Actor將消息排隊,那麼您將獲得每個消息類型一次最多處理一個線程的目標,同時僅由一個池支持線程。

演示技術的:

在阿卡Maven的依賴性。

<dependency> 
     <groupId>com.typesafe.akka</groupId> 
     <artifactId>akka-actor_2.11</artifactId> 
     <version>2.4.17</version> 
    </dependency> 

Java 8代碼。複製並粘貼到Java文件中,然後在IDE中運行主要方法。

package com.softwaremosaic.demos.akka; 

import akka.actor.ActorRef; 
import akka.actor.ActorSystem; 
import akka.actor.Props; 
import akka.actor.UntypedActor; 

import java.util.Collections; 
import java.util.List; 
import java.util.concurrent.AbstractExecutorService; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.TimeUnit; 
import java.util.function.Function; 


public class ActorDemo { 

    public static void main(String[] args) throws InterruptedException { 
     // The following partitioner will spread the requests over 
     // multiple actors, which I chose to demonstrate the technique. 
     // You will need to change it to one that better maps the the 
     // jobs to your use case. Remember that jobs that get mapped 
     // to the same key, will get executed in serial (probably 
     // but not necessarily) by the same thread. 
     ExecutorService exectorService = new ActorExecutionService(job -> job.hashCode()+""); 

     for (int i=0; i<100; i++) { 
      int id = i; 
      exectorService.submit(() -> System.out.println("JOB " + id)); 
     } 

     exectorService.shutdown(); 
     exectorService.awaitTermination(1, TimeUnit.MINUTES); 

     System.out.println("DONE"); 
    } 

} 


class ActorExecutionService extends AbstractExecutorService { 

    private final ActorSystem        actorSystem; 
    private final Function<Runnable, String>    partitioner; 
    private final ConcurrentHashMap<String,ActorRef>  actors = new ConcurrentHashMap<>(); 

    public ActorExecutionService(Function<Runnable,String> partitioner) { 
     this.actorSystem = ActorSystem.create("demo"); 
     this.partitioner = partitioner; 
    } 


    public void execute(Runnable command) { 
     String partitionKey = partitioner.apply(command); 

     ActorRef actorRef = actors.computeIfAbsent(partitionKey, this::createNewActor); 

     actorRef.tell(command, actorRef); 
    } 

    private ActorRef createNewActor(String partitionKey) { 
     return actorSystem.actorOf(Props.create(ExecutionServiceActor.class), partitionKey); 
    } 


    public void shutdown() { 
     actorSystem.terminate(); 
    } 

    public List<Runnable> shutdownNow() { 
     actorSystem.terminate(); 

     try { 
      awaitTermination(1, TimeUnit.MINUTES); 
     } catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     } 

     return Collections.emptyList(); 
    } 

    public boolean isShutdown() { 
     return actorSystem.isTerminated(); 
    } 

    public boolean isTerminated() { 
     return actorSystem.isTerminated(); 
    } 

    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 
     actorSystem.awaitTermination(); 

     return actorSystem.isTerminated(); 
    } 
} 

class ExecutionServiceActor extends UntypedActor { 
    public void onReceive(Object message) throws Exception { 
     if (message instanceof Runnable) { 
      ((Runnable) message).run(); 
     } else { 
      unhandled(message); 
     } 
    } 
} 

NB上面的代碼將在未定義的順序打印1-100。由於配料(Akka爲了獲得額外的性能優勢),訂單看起來大多是連續的。然而,隨着不同線程穿插工作,您會看到一些隨機數。每個作業需要運行的時間越長,分配給阿卡線程池的多個線程,使用越多分區鍵和更底層的CPU內核,更多的隨機序列很可能成爲。

+0

你可以分享一個示例代碼嗎? – niraj

+0

@niraj添加了演示代碼。任何問題只是霍拉。 –

+0

謝謝@Chris的演示代碼 – niraj

2

這裏是我的怎麼可能看起來非常簡單的例子。 您創建一個包含10個ArrayDeques的地圖,這些ArrayDeques由其「Typ」定址。 另外您還可以啓動10個ScheduledExecutors。 每個等待最初5秒,然後每200毫秒輪詢它的隊列。 在該當前的示例的輸出總是支持「而TYPEx的當前消息隊列:空」作爲隊列都是空的。

可是你現在可以得到它,並通過您的信息匹配的隊列。該服務將每隔200毫秒進行一次,並根據需要進行任何操作。 而且,當您使用隊列時,還會自動處理消息的處理順序。

import java.util.ArrayDeque; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

public class Messages { 

    public static void main(String[] args) { 

     Map<String, ArrayDeque<String>> messages = new HashMap<String, ArrayDeque<String>>(); 
     ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); 
     long initialDelay = 5000; 
     long period = 200; 

     // create 10 Queues, indexed by the type 
     // create 10 executor-services, focused on their message queue 
     for(int i=1; i<11; i++) { 
      String type = "Type" + i; 

      Runnable task =() -> System.out.println(
        "current message of " + type + ": " + messages.get(type).poll() 
      ); 

      messages.put(type, new ArrayDeque<String>()); 
      service.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS); 
     } 

    } 
} 
+0

它如何創建10個執行程序?當我想要在整個生命週期中執行它們時,以固定速率使用調度消息的意義何在? – niraj

+0

創建在循環內發生。 「service.scheduleAtFixedRate(task,initialDelay,period,TimeUnit.MILLISECONDS);」計劃...-方法被稱爲10次。每次通話都會創建一項新任務。這些任務定期運行,直到取消爲止,並每隔200毫秒檢查其隊列的內容。如果有新消息存在,它將被處理。 (您必須將我的可運行打印方法替換爲特定的消息處理代碼)。所以他們會在整個生命週期內處理你的信息。無論何時您將消息放入隊列中,只要應用程序運行,它們都將被處理。 – DiabolicWords

+0

也許你打算以其他方式設計或運行你的應用程序。但是我的代碼首先反映了我的理解。 - 您會收到您在特定類型中劃分的消息。這是我的排隊概念。你想轉發這些消息,並知道每種消息的類型。即使這是使用隊列和另外的10個預定的服務,其中每個服務只關心他的特定隊列,直到您取消爲止。 – DiabolicWords