2016-02-05 128 views
15

我不明白如何有效地使用AsyncRestTemplate進行外部服務呼叫。對於下面的代碼:如何使用AsyncRestTemplate同時進行多個呼叫?

class Foo { 

    public void doStuff() { 
     Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(
       url1, String.class); 
     String response1 = future1.get(); 

     Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
       url2, String.class); 
     String response2 = future2.get(); 

     Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(
       url3, String.class); 
     String response3 = future3.get(); 
    } 
} 

理想我想同時執行所有3調用和處理結果一旦他們全部完成。 然而每個外部服務調用牽強,直到get()的調用,但get()被阻止。那麼這是不是破壞了AsyncRestTemplate的目的?我不如使用RestTemplate

所以我不知道如何讓他們同時執行?

+1

請參閱此鏈接中的示例(http://www.concretepage.com/spring-4/spring-4-asyncresttemplate-listenablefuture-example),如果有幫助 –

+0

@vineethsivan鏈接不會回答我的問題。該鏈接僅顯示如何調用future.get()。 – Glide

回答

2

您可能需要使用CompletableFuture類(javadoc)。

  1. 將您的呼叫轉換爲CompletableFuture。例如。

    final CompletableFuture<ResponseEntity<String>> cf = CompletableFuture.supplyAsync(() -> { 
        try { 
         return future.get(); 
        } catch (InterruptedException | ExecutionException e) { 
         throw new RuntimeException(e); 
        } 
    }); 
    
  2. 下一頁呼叫CompletableFuture::allOf方法與3和新創建completable期貨。

  3. 請致電join()方法的結果。所產生的completable未來解決後,你可以從每個單獨completable未來的結果你在步驟3中創建

11

根本就不叫調度所有的異步調用的前堵get()

class Foo { 
    public void doStuff() { 
    ListenableFuture<ResponseEntity<String>> future1 = asyncRestTemplate 
     .getForEntity(url1, String.class); 
    ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate 
     .getForEntity(url2, String.class); 
    ListenableFuture<ResponseEntity<String>> future3 = asyncRestTemplate 
     .getForEntity(url3, String.class); 

    String response1 = future1.get(); 
    String response2 = future2.get(); 
    String response3 = future3.get(); 
    } 
} 

你都可以做派遣得到循環,但要注意,目前的結果是採集效率低下,因爲它會卡住的下一個未完成的未來。

您可以將所有期貨添加到集合中,然後遍歷它,測試每個未來的非阻塞isDone()。當該呼叫返回時,您可以致電get()

這種方式可以優化您的整體搜索結果,而不是按照調用get() s的順序等待下一個緩慢的未來結果。

更好的是,您還可以在AccyncRestTemplate返回的每個ListenableFuture之內註冊回調(運行時),您不必擔心週期性檢查潛在結果。

+0

你可以確認何時調用getForEntity(),它會啓動連接嗎?我可以通過Charles Proxy看到我的網絡流量,直到future.get()被調用時纔看到連接。 – Glide

5

如果您不必使用'AsyncRestTemplate',我會建議使用RxJava來代替。 RxJava zip運營商是你在找什麼。檢查下面的代碼:

private rx.Observable<String> externalCall(String url, int delayMilliseconds) { 
    return rx.Observable.create(
      subscriber -> { 
       try { 
        Thread.sleep(delayMilliseconds); //simulate long operation 
        subscriber.onNext("response(" + url + ") "); 
        subscriber.onCompleted(); 
       } catch (InterruptedException e) { 
        subscriber.onError(e); 
       } 
      } 
    ); 
} 

public void callServices() { 
    rx.Observable<String> call1 = externalCall("url1", 1000).subscribeOn(Schedulers.newThread()); 
    rx.Observable<String> call2 = externalCall("url2", 4000).subscribeOn(Schedulers.newThread()); 
    rx.Observable<String> call3 = externalCall("url3", 5000).subscribeOn(Schedulers.newThread()); 
    rx.Observable.zip(call1, call2, call3, (resp1, resp2, resp3) -> resp1 + resp2 + resp3) 
      .subscribeOn(Schedulers.newThread()) 
      .subscribe(response -> System.out.println("done with: " + response)); 
} 

對外部服務的所有請求都將在單獨的線程,當最後一次通話將完成轉換功能(例如簡單的字符串連接中)執行將被應用和結果(連接字符串)將被emmited從'zip'可觀察。

5

我的理解你的問題是你有一個預定義的異步方法,你試圖做的是使用RestTemplate類異步調用這個方法。

我已經寫了一個方法,可以幫助你調用你的方法asynchoronously。

public void testMyAsynchronousMethod(String... args) throws Exception { 
     // Start the clock 
     long start = System.currentTimeMillis(); 

     // Kick of multiple, asynchronous lookups 
     Future<String> future1 = asyncRestTemplate 
     .getForEntity(url1, String.class);; 
     Future<String> future2 = asyncRestTemplate 
     .getForEntity(url2, String.class); 
     Future<String> future3 = asyncRestTemplate 
     .getForEntity(url3, String.class); 

     // Wait until they are all done 
     while (!(future1 .isDone() && future2.isDone() && future3.isDone())) { 
      Thread.sleep(10); //10-millisecond pause between each check 
     } 

     // Print results, including elapsed time 
     System.out.println("Elapsed time: " + (System.currentTimeMillis() - start)); 
     System.out.println(future1.get()); 
     System.out.println(future2.get()); 
     System.out.println(future3.get()); 
    } 
-1

我想你在這裏誤解了一些東西。當您調用getForEntity方法時,請求已被觸發。當調用將來對象的get()方法時,您只需等待請求完成即可。因此,爲了防火同一一秒所有這三個要求,你就必須做到:

// Each of the lines below will fire an http request when it's executed 
Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(url1, String.class); 
Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(url2, String.class); 
Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(url3, String.class); 

所有這些代碼運行後,所有的請求已經被解僱(在相同的一秒最有可能)。那麼你可以在這期間做任何你想做的事情。只要調用任何get()方法,就等待每個請求完成。如果它們已經完成,那麼它將立即返回。

// do whatever you want in the meantime 
// get the response of the http call and wait if it's not completed 
String response1 = future1.get(); 
String response2 = future2.get(); 
String response3 = future3.get(); 
+0

任何人都可以確認何時調用'getForEntity()',請求已被觸發?我可以通過Charles Proxy看到我的網絡流量,直到調用future.get()時纔看到連接。 – Glide

+0

我不知道爲什麼它不適合你,但請檢查http://javattitude.com/2014/04/20/using-spring-4-asyncresttemplate/他們有教程和測試的東西。所以它應該工作。 – Rowanto

0

我不認爲任何以前的答案實際上實現了並行性。 @diginoise響應的問題在於它實際上並沒有實現並行性。只要我們撥打get,我們就會被封鎖。考慮到電話真的很慢,因此future1需要3秒才能完成,future2秒和future3再次需要3秒。隨着3 get呼叫一個接一個,我們最終等待3 + 2 + 3 = 8秒。 @Vikrant Kashyap答案塊以及while (!(future1 .isDone() && future2.isDone() && future3.isDone()))。除了while循環是3期的一段相當醜陋的代碼,如果你有更多的代碼呢? @lkz答案使用的技術與您所要求的不同,即使如此,我也不確定zip是否可以完成這項工作。從Observable的Javadoc:

拉鍊適用於嚴格的順序這種功能,所以通過新可觀察發出的第一項 將被應用到由每個源可觀察量發射的第一項目的功能 的結果; 由新Observable發射的第二項將是 函數的結果應用於每個發射的第二項的函數 Observable;等等。

由於Spring的廣泛流行,他們非常努力地維護向後兼容性,並且這樣做有時會與API做出妥協。 AsyncRestTemplate返回的方法ListenableFuture就是這樣一種情況。如果他們承諾使用Java 8+,則可以使用CompletableFuture。爲什麼?由於我們不會直接處理線程池,所以我們沒有一個很好的方法來知道所有ListenableFutures何時完成。 CompletableFuture有一個allOf方法創建新的CompletableFuture,當所有給定的CompletableFutures完成時完成。由於我們在ListenableFuture沒有,我們將不得不即興。 我沒有編譯下面的代碼,但它應該清楚我想要做什麼。我使用Java 8,因爲它是2016年

// Lombok FTW 
@RequiredArgsConstructor 
public final class CounterCallback implements ListenableFutureCallback<ResponseEntity<String>> { 
    private final LongAdder adder; 

    public void onFailure(Throwable ex) { 
    adder.increment(); 
    } 
    public void onSuccess(ResponseEntity<String> result) { 
    adder.increment(); 
    } 
} 

ListenableFuture<ResponseEntity<String>> f1 = asyncRestTemplate 
     .getForEntity(url1, String.class); 
f1.addCallback(//); 
// more futures 

LongAdder adder = new LongAdder(); 
ListenableFutureCallback<ResponseEntity<String>> callback = new CounterCallback(adder); 
Stream.of(f1, f2, f3) 
    .forEach {f -> f.addCallback(callback)} 

for (int counter = 1; adder.sum() < 3 && counter < 10; counter++) { 
    Thread.sleep(1000); 
} 
// either all futures are done or we're done waiting 
Map<Boolean, ResponseEntity<String>> futures = Stream.of(f1, f2, f3) 
    .collect(Collectors.partitioningBy(Future::isDone)); 

末我們現在已經一個Map爲其futures.get(Boolean.TRUE)會給我們已經完成了所有的期貨和futures.get(Boolean.FALSE)會給我們沒有的人。我們將要取消那些沒有完成的。

此代碼做了幾件事情,是很重要的並行編程:

  1. 它不會阻止。
  2. 它將操作限制在某個最大允許時間內。
  3. 它明確區分成功和失敗的情況。
+1

我不認爲@ diginoise的解決方案會花費「3 + 2 + 3 = 8秒」。當調用getForEntity時,所有的http請求都會被髮送出去,所以對future1,future2,future3的實際響應最多需要三個請求,在這種情況下最多爲3秒(max(3,2 ,3)) – simomo

+0

@simomo'直到'get'被調用纔會解決Future's。這是異步101. –

+0

嗨@Abhijit,當調用getForEntity時,http請求將被提交給'taskExecutor',然後taskExecutor發起http請求(大多數情況下立即)。這是[代碼](https://github.com/spring-projects/spring-framework/blob/a5b94f3a776c16ce3eb09ac92a9a7907910f5ff5/spring-web/src/main/java/org/springframework/http/client/SimpleBufferingAsyncClientHttpRequest.java#L79 )。所以我的觀點是發送http請求是一個調用'getForEntity'的異步進程,但它肯定沒有調用get的方法。 – simomo

相關問題