2013-02-15 72 views
3

我在Java Web應用程序中擁有分層體系結構。 UI層只是Java,服務是類型化的Akka actors和外部服務調用(WS,DB等)被包裝在Hystrix命令中。如何使用Akka actors處理Java未來

用戶界面調用該服務,服務返回Akka未來。這是一個Akka的未來,因爲我想通過Akka期貨提供的onComplete和onFailure回調來簡化UI編碼。然後該服務創建一個未來,執行一些映射等,並將調用包裝爲一個返回Java未來的HystrixCommand。

在僞

所以:

UI

AkkaFuture future = service.getSomeData(); 

服務

public AkkaFuture getSomeData() { 
    return future { 
     JavaFuture future = new HystrixCommand(mapSomeData()).queue() 
     //what to do here, currently just return future.get() 
    } 
} 

的問題是,我想騰出服務男主角使用線程,只是領帶了解Hystrix使用的線程。但Java未來阻止了這一點,因爲我必須阻止它的完成。我能想到的唯一選擇(我不確定我喜歡)是不斷調查Java未來,並在Java未來完成時完成Akka未來。

注意:這個問題與Hystrix本身並沒有真正的關係,但是如果有人提出了一個與Hystrix特別相關的解決方案,我決定提一提。

回答

2

我正在用@Hbf標記答案作爲解決方案,因爲我最終做了一個Akka輪詢,如How do I wrap a java.util.concurrent.Future in an Akka Future?中所述。作爲參考,我也嘗試過:

  • 創建HystrixCommandExcutionHook並擴展HystrixCommand以允許回調。這不起作用,因爲鉤子沒有在正確的時間被調用。
  • 通過讓裝飾執行者在Hystrix內部創建期貨,然後從命令中投射期貨,使用Guavas可預測的未來。因爲Hystrix使用無法裝飾的ThreadPoolExecutor,所以不起作用。

編輯︰我添加下面的Akka輪詢代碼,因爲原來的答案是在斯卡拉和它掛起如果Java未來不會很好地取消。下面的解決方案在超時後總是離開線程。


    protected Future wrapJavaFutureInAkkaFuture(final java.util.concurrent.Future javaFuture, final Option maybeTimeout, final ActorSystem actorSystem) { 
     final Promise promise = Futures.promise(); 
     if (maybeTimeout.isDefined()) { 
      pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option.option(maybeTimeout.get().fromNow()), actorSystem); 
     } else { 
      pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, Option. none(), actorSystem); 
     } 

     return promise.future(); 
    } 

    protected void pollJavaFutureUntilDoneOrCancelled(final java.util.concurrent.Future javaFuture, final Promise promise, final Option maybeTimeout, final ActorSystem actorSystem) { 
     if (maybeTimeout.isDefined() && maybeTimeout.get().isOverdue()) { 
     // on timeouts, try to cancel the Java future and simply walk away 
     javaFuture.cancel(true); 
     promise.failure(new ExecutionException(new TimeoutException("Future timed out after " + maybeTimeout.get()))); 

     } else if (javaFuture.isDone()) { 
     try { 
      promise.success(javaFuture.get()); 
     } catch (final Exception e) { 
      promise.failure(e); 
     } 
     } else { 
      actorSystem.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), new Runnable() { 
      @Override 
      public void run() { 
      pollJavaFutureUntilDoneOrCancelled(javaFuture, promise, maybeTimeout, actorSystem); 
      } 
     }, actorSystem.dispatcher()); 
     } 
    }