我使用Java的ExecutorService,Future和ConcurrentLinkedQueue完成與SQS類似的東西。
ExecutorService創建一個線程池,該線程池可以執行實現Callable接口並返回Future的類。當ExecutorService創建期貨時,我將它們推送到在線程中運行的ConcurrentLinkedQueue,並在期貨完成時處理結果。
實施檢查SQS和異步開始工作:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class SqsProcessor {
private static final int THREAD_COUNT = 100;
private ExecutorService _executor = null;
private FutureResultProcessor futureResultProcessor = null;
public SqsProcessor() {
_executor = Executors.newFixedThreadPool(THREAD_COUNT);
_futureResultProcessor = new FutureResultProcessor();
}
public void waitReceive() {
// Receive a SQS message
// Start the work related to the SQS message
Callable<MyWorkderResult> sqsWorker = new MyWorker(sqsMessage);
Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker);
// Send to the queue so the result can be processed when it completes
_futureResultProcessor.add(sqsFuture);
}
}
的類,它的工作:
import java.util.concurrent.Callable;
public class MyWorker implements Callable<MyWorkerResult> {
private String _sqsMessage = null;
public MyWorker(String sqsMessage) {
_sqsMessage = sqsMessage;
}
@Override
public MyWorkerResult call() throws Exception {
// Do work relating to the SQS message
}
}
舉行的工作成果:
public class MyWorkerResult {
// Results set in MyWorker call()
}
的ConcurrentLinkedQueue接收並處理未來的結果:
import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentLinkedQueue;
public class FutureResultProcessor extends Thread {
private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>();
private final Integer CHECK_SLEEP = 300;
public FutureResultProcessor() {
}
public void run() {
while(true) {
Future<MyWorkerResult> myFuture = resultQueue.poll();
if(myFuture == null) {
// There's nothing to process
try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {}
continue;
}
// Process result
if(myFuture != null) {
MyFutureResult myFutureResult = myFuture.get();
// Process result
}
}
}
public void add(Future<MyWorkerResult> sqsFuture) {
resultQueue.offer(sqsFuture);
}
}
或者,您可以收集一組期貨並等待它們全部完成,然後再處理結果。
Akka可能是個不錯的選擇。我沒有直接使用它,但它提供了運行異步任務的框架,提供了錯誤處理,甚至可以將任務分配給遠程實例。