2014-01-31 46 views
5

我很難搞清楚如何處理來自Amazon SQS的消息。在Java中異步處理來自隊列的Amazon SQS消息

我想實現如下:

  1. 監聽器SQS從隊列
  2. 處理消息,並將其添加到DB
  3. 從隊列

刪除處理的消息讓我困擾的一個很多是如何實施步驟2.我有類SQSConnectorProfileDao。現在我想要簡單的實現,通過初始化SQSConnectorProfileDao並接收來自隊列的消息。我的想法是開始新的線程,開始輪詢消息,當隊列爲空時,從ProfileDao中斷線程。

返回/處理消息(回調函數?)的最佳方式是什麼?如果還有其他方法,我可以選擇。

謝謝

回答

3

我使用Java的ExecutorServiceFutureConcurrentLinkedQueue完成與SQS類似的東西。

ExecutorService創建一個線程池,該線程池可以執行實現Callable接口並返回Future的類。當ExecutorService創建期貨時,我將它們推送到在線程中運行的ConcurrentLinkedQueue,並在期貨完成時處理結果。

實施檢查SQS和異步開始工作:

import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class SqsProcessor { 

    private static final int THREAD_COUNT = 100; 
    private ExecutorService _executor = null; 
    private FutureResultProcessor futureResultProcessor = null; 

    public SqsProcessor() { 
     _executor = Executors.newFixedThreadPool(THREAD_COUNT); 
     _futureResultProcessor = new FutureResultProcessor(); 
    } 

    public void waitReceive() { 

     // Receive a SQS message 

     // Start the work related to the SQS message 
     Callable<MyWorkderResult> sqsWorker = new MyWorker(sqsMessage); 
     Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker); 

     // Send to the queue so the result can be processed when it completes 
     _futureResultProcessor.add(sqsFuture); 
    } 
} 

的類,它的工作:

import java.util.concurrent.Callable; 

public class MyWorker implements Callable<MyWorkerResult> { 

    private String _sqsMessage = null; 

    public MyWorker(String sqsMessage) { 
     _sqsMessage = sqsMessage; 
    } 

    @Override 
    public MyWorkerResult call() throws Exception { 
     // Do work relating to the SQS message 
    } 
} 

舉行的工作成果:

public class MyWorkerResult { 
    // Results set in MyWorker call() 
} 

的ConcurrentLinkedQueue接收並處理未來的結果:

import java.util.concurrent.Future; 
import java.util.concurrent.ConcurrentLinkedQueue; 

public class FutureResultProcessor extends Thread { 

    private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>(); 
    private final Integer CHECK_SLEEP = 300; 

    public FutureResultProcessor() { 
    } 

    public void run() { 
     while(true) { 
      Future<MyWorkerResult> myFuture = resultQueue.poll(); 

      if(myFuture == null) { 
       // There's nothing to process 
       try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {} 
       continue; 
      } 

      // Process result 
      if(myFuture != null) { 

       MyFutureResult myFutureResult = myFuture.get(); 

       // Process result 
      } 
     } 
    } 

    public void add(Future<MyWorkerResult> sqsFuture) { 
     resultQueue.offer(sqsFuture); 
    } 
} 

或者,您可以收集一組期貨並等待它們全部完成,然後再處理結果。

Akka可能是個不錯的選擇。我沒有直接使用它,但它提供了運行異步任務的框架,提供了錯誤處理,甚至可以將任務分配給遠程實例。