2017-05-03 100 views
0

我正在處理重載應用程序,並希望能夠將數據異步放入緩存。我的意思是像調用緩存中的操作,並收到代表結果的IgniteFuture<V>Ignite將數據異步放入緩存

我的意思是我需要能夠從持久性存儲中平行抽取數據並將它們放入緩存中。如果發生什麼事情,我可以嘗試再次提取數據(罰款不太多的數據,相當細緻)。

+0

什麼是你的問題?看起來你做了一個介紹,但忘了問題的最後一部分? –

回答

1

如果你沒有對IgniteFuture任何硬性要求(我相信不應該是這樣)和所有你想要的是某種機制來將數據放入高速緩存,並把它異步處理,然後處理結果通過該操作返回,那麼您可以使用Java's Executor Service

如果您不太瞭解Java的執行程序服務,那麼您可能需要閱讀文檔或this answer,其中強調了快速點,下面的示例中我還添加了註釋。

下面是關於線程的數量很少其他快速點:

  • 本例創建一個ExecutorService與「ThreadPoolExecutor」的實施,也有你確定有多少,它可以讓喜歡「Executors.newSingleThreadExecutor()」和「Executors.newFixedThreadPool(10)」其他選項你想在JVM中使用線程。
  • 您也可以選擇直接創建ThreadPoolExecutor的對象,如return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());,並且您可以更多地控制隊列的線程數和類型。您需要了解更多關於這一點,如果你想創建自己ThreadPoolExecutor對象,而不是通過與您實現ExecutorService

一些其他問題:

  • 當你正在處理的Future對象則是因爲Future.get()是一個阻塞調用,所以您的主線程將被阻塞,直到所有Future對象被返回並處理。
  • 如果你不想阻塞,那麼有幾個選項,你可以創建一個新的線程來完成所有這些處理,從而釋放你的主線程。另一種選擇可能是不處理Future對象,所以儘快你做executorService.submit(callableTask1),你的線程將是免費的,然後在你的CallableTask對象中,你可以將結果推送到隊列中(你可以根據需要選擇Java's queue implementation),然後從其他線程處理該隊列。還有一種選擇可能是專用另一個線程來處理你的Future對象。

示例代碼:

import java.util.ArrayList; 
import java.util.Date; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class ExecutorServiceFutureCallableExample { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     List<Future<String>> futuresList = new ArrayList<>(); 

     ExecutorService executorService = Executors.newCachedThreadPool(); 
     ExecutorServiceFutureCallableExample.CallableTask callableTask1 = new ExecutorServiceFutureCallableExample.CallableTask(2000); 
     ExecutorServiceFutureCallableExample.CallableTask callableTask2 = new ExecutorServiceFutureCallableExample.CallableTask(1000); 
     ExecutorServiceFutureCallableExample.CallableTask callableTask3 = new ExecutorServiceFutureCallableExample.CallableTask(3000); 

     System.out.println("### Starting submitting tasks"); 

     // submit the callable and register the returned future object so that it can be processed later. 
     futuresList.add(executorService.submit(callableTask1)); 
     futuresList.add(executorService.submit(callableTask2)); 
     futuresList.add(executorService.submit(callableTask3)); 

     System.out.println("### Finished submitting tasks"); 

     for (int i = 0; i < futuresList.size(); i++) { 
      // here "get()" waits for the future tasks to be returned. 
      System.out.println(futuresList.get(i).get()); 
     } 

     System.out.println("### Finished."); 
    } 

    static class CallableTask implements Callable<String>{ 

     private long timeToSleep; 

     CallableTask(long _timeToSleep){ 
      this.timeToSleep = _timeToSleep; 
     } 

     @Override 
     public String call() throws Exception { 
      String str = new Date() + ": Processing - " + this.hashCode() + " | " + Thread.currentThread() + ", slept for seconds - " + timeToSleep; 
      System.out.println(str); 
      Thread.sleep(timeToSleep); 
      return str + " ||||| completed at: " + new Date(); 
     } 

     public long getTimeToSleep() { 
      return timeToSleep; 
     } 

     public void setTimeToSleep(long timeToSleep) { 
      this.timeToSleep = timeToSleep; 
     } 

    } 
}