2016-09-30 65 views
0

我有一個會話列表,我必須調用webservice來在每個會話上設置一些屬性。CompletableFuture完成webservice調用並保存完成時的一切

我想調用webservice使用異步過程,並使用completablefuture爲它,以便當它完成後,我可以將它們全部保存在分貝。

我該怎麼做?到目前爲止,我的代碼如下,它不起作用。

sessions.stream() 
     .forEach(s -> CompletableFuture.runAsync(() -> webServiceCall(s), executor)); 
sessionService.saveAll(sessions); 

編輯:

我想出了這個解決方案,不知道這是做它的正確途徑。

List<CompletableFuture<Void>> futures = sessions.stream() 
      .map(s -> CompletableFuture.runAsync(() -> webServiceCall(s), executor)) 
      .collect(Collectors.toList()); 
     CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 
         .join(); 
     sessionService.saveAll(sessions); 

我使用的加入,以確保它等待響應節約會議

回答

2

總之前返回 - 所有你需要的是這樣的 -

CompletableFuture.supplyAsync(this::supplySomething, ex).thenAccept(this::consumer); 

您需要一種方法是將調用執行程序(線程池)。在我的情況下,我的游泳池大小爲100.接下來,您需要根據需要多次致電您的供應商。

每次調用'supplier'都會創建一個任務。我正在創建10000個任務。他們每個人都會並行運行,每個人完成後都會打電話給我的'消費者'。

您的供應商應該返回某種對象,該對象持有來自webservice的響應。這個對象將成爲你的'消費者'方法的參數。

您可能想要在完成所有操作後(或中間)殺死池。

請參見下面的例子 -

package com.sanjeev.java8.thread; 

import java.io.BufferedReader; 
import java.io.DataOutputStream; 
import java.io.InputStreamReader; 
import java.io.Reader; 
import java.net.HttpURLConnection; 
import java.net.URL; 
import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

public class Caller { 

public static ExecutorService ex = Executors.newFixedThreadPool(100); 

public static void main(String[] args) throws InterruptedException { 
    Caller caller = new Caller(); 

    caller.start(); 

    ex.shutdown(); 
    ex.awaitTermination(10, TimeUnit.MINUTES); 
} 

private void start() { 
    for (int i = 0; i < 10000; i++) { 
     CompletableFuture.supplyAsync(this::supplySomething, ex).thenAccept(this::consumer); 
    } 
} 

private int supplySomething() { 
    try { 
     URL url = new URL("http://www.mywebservice.com"); 

     HttpURLConnection connection = (HttpURLConnection) url.openConnection(); 
     connection.setRequestMethod("POST"); 
     connection.setDoOutput(true); 
     connection.setDoInput(true); 

     connection.connect(); 

     try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) { 
      wr.write("supply-some-data".getBytes()); 
     } 

     Reader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); 

     for (int c; (c = in.read()) >= 0;) { 
      System.out.print((char) c); 
     } 

     in.close(); 

     // return the response code. I'm return 'int', you should return some sort of object. 
     return 200; 

    } catch (Exception e) { 
     e.printStackTrace(); 
     throw new RuntimeException(e); 
    } 
} 

public void consumer(Integer i) { 
    // This parameter should be of type 'your object' that supplier returned. 
    // I got the response; add it in the list or whatever.... 
} 

}

另一個例子可能適合您需要更好 -

public class Caller2 { 

public static ExecutorService ex = Executors.newFixedThreadPool(2); 
private static Iterator<String> addresses = Stream.of("www.google.com", "www.yahoo.com", "www.abc.com").collect(Collectors.toList()).iterator(); 
private static ArrayList<String> results = new ArrayList<>(); 

public static void main(String[] args) throws InterruptedException { 
    Caller2 caller = new Caller2(); 

    caller.start(); 

    ex.shutdown(); 
    ex.awaitTermination(1, TimeUnit.HOURS); 

    System.out.println(results); 
} 

private void start() { 
    while (addresses.hasNext()) { 
     CompletableFuture.supplyAsync(this::supplyURL, ex).thenAccept(this::consumer); 
    } 
} 

private String supplyURL() { 
    String url = addresses.next(); 
    // call this URL and return response; 
    return "Success"; 
} 

public void consumer(String result) { 
    results.add(result); 
}