2017-03-09 88 views
1

我正在研究一個庫,它將採用一個對象DataRequest作爲輸入參數並基於該對象構建一個URL,然後調用我們的應用程序服務器使用apache http客戶端,然後將響應返回給使用我們庫的客戶。有些客戶會撥打executeSync方法獲得相同的功能,一些客戶會撥打我們的executeAsync方法來獲取數據。在多線程環境中並行執行每個子任務

  • executeSync() - 等待,直到我有一個結果,返回結果。
  • executeAsync() - 返回一個Future,如果需要,可以在其他事情完成後立即處理。

下面是我DataClient類具有以上兩種方法:

public class DataClient implements Client { 
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(16); 
    private CloseableHttpClient httpClientBuilder; 

    // initializing httpclient only once 
    public DataClient() { 
    try { 
     RequestConfig requestConfig = 
      RequestConfig.custom().setConnectionRequestTimeout(500).setConnectTimeout(500) 
       .setSocketTimeout(500).setStaleConnectionCheckEnabled(false).build(); 
     SocketConfig socketConfig = 
      SocketConfig.custom().setSoKeepAlive(true).setTcpNoDelay(true).build(); 

     PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = 
      new PoolingHttpClientConnectionManager(); 
     poolingHttpClientConnectionManager.setMaxTotal(300); 
     poolingHttpClientConnectionManager.setDefaultMaxPerRoute(200); 

     httpClientBuilder = 
      HttpClientBuilder.create().setConnectionManager(poolingHttpClientConnectionManager) 
       .setDefaultRequestConfig(requestConfig).setDefaultSocketConfig(socketConfig).build(); 
    } catch (Exception ex) { 
     // log error 
    } 
    } 

    @Override 
    public List<DataResponse> executeSync(DataRequest key) { 
    List<DataResponse> responsList = null; 
    Future<List<DataResponse>> responseFuture = null; 

    try { 
     responseFuture = executeAsync(key); 
     responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit()); 
    } catch (TimeoutException | ExecutionException | InterruptedException ex) { 
     responsList = 
      Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, 
       DataStatusEnum.ERROR)); 
     responseFuture.cancel(true); 
     // logging exception here 
    } 
    return responsList; 
    } 

    @Override 
    public Future<List<DataResponse>> executeAsync(DataRequest key) { 
    DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder); 
    return this.forkJoinPool.submit(task); 
    } 
} 

下面是我DataFetcherTask類也有一個靜態類DataRequestTask它通過使URL調用我們的應用服務器:

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> { 
    private final DataRequest key; 
    private final CloseableHttpClient httpClientBuilder; 

    public DataFetcherTask(DataRequest key, CloseableHttpClient httpClientBuilder) { 
    this.key = key; 
    this.httpClientBuilder = httpClientBuilder; 
    } 

    @Override 
    protected List<DataResponse> compute() { 
    // Create subtasks for the key and invoke them 
    List<DataRequestTask> requestTasks = requestTasks(generateKeys()); 
    invokeAll(requestTasks); 

    // All tasks are finished if invokeAll() returns. 
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size()); 
    for (DataRequestTask task : requestTasks) { 
     try { 
     responseList.add(task.get()); 
     } catch (InterruptedException | ExecutionException e) { 
     Thread.currentThread().interrupt(); 
     return Collections.emptyList(); 
     } 
    } 
    return responseList; 
    } 

    private List<DataRequestTask> requestTasks(List<DataRequest> keys) { 
    List<DataRequestTask> tasks = new ArrayList<>(keys.size()); 
    for (DataRequest key : keys) { 
     tasks.add(new DataRequestTask(key)); 
    } 
    return tasks; 
    } 

    // In this method I am making a HTTP call to another service 
    // and then I will make List<DataRequest> accordingly. 
    private List<DataRequest> generateKeys() { 
    List<DataRequest> keys = new ArrayList<>(); 
    // use key object which is passed in contructor to make HTTP call to another service 
    // and then make List of DataRequest object and return keys. 
    return keys; 
    } 

    /** Inner class for the subtasks. */ 
    private static class DataRequestTask extends RecursiveTask<DataResponse> { 
    private final DataRequest request; 

    public DataRequestTask(DataRequest request) { 
     this.request = request; 
    } 

    @Override 
    protected DataResponse compute() { 
     return performDataRequest(this.request); 
    } 

    private DataResponse performDataRequest(DataRequest key) { 
     MappingHolder mappings = DataMapping.getMappings(key.getType()); 
     List<String> hostnames = mappings.getAllHostnames(key); 

     for (String hostname : hostnames) { 
     String url = generateUrl(hostname); 
     HttpGet httpGet = new HttpGet(url); 
     httpGet.setConfig(generateRequestConfig()); 
     httpGet.addHeader(key.getHeader()); 

     try (CloseableHttpResponse response = httpClientBuilder.execute(httpGet)) { 
      HttpEntity entity = response.getEntity(); 
      String responseBody = 
       TestUtils.isEmpty(entity) ? null : IOUtils.toString(entity.getContent(), 
        StandardCharsets.UTF_8); 

      return new DataResponse(responseBody, DataErrorEnum.OK, DataStatusEnum.OK); 
     } catch (IOException ex) { 
      // log error 
     } 
     } 
     return new DataResponse(DataErrorEnum.SERVERS_DOWN, DataStatusEnum.ERROR); 
    } 
    } 
} 

對於每個DataRequest對象,都有一個DataResponse對象。現在有人通過傳遞DataRequest對象調用我們的庫,在內部我們製作List<DataRequest>對象,然後我們並行調用每個對象DataRequest,並返回List<DataResponse>,其中列表中的每個DataResponse對象都會響應對應的對象DataRequest對象。

下面是流量:

  • 客戶將通過傳遞DataRequest對象調用DataClient類。他們可以根據他們的要求調用executeSync()executeAsync()方法。
  • 現在在DataFetcherTask類(這是ForkJoinTask's亞型一個RecursiveTask一個),給定key對象,它是一個單一的DataRequest,我會產生List<DataRequest>,然後調用每個子任務並行爲列表中的每個對象DataRequest。這些子任務與父任務在相同的ForkJoinPool中執行。
  • 現在在DataRequestTask類中,我通過創建URL並將其DataResponse對象返回來執行每個DataRequest對象。

問題陳述:

因爲這個庫被稱爲一個非常高的吞吐量的環境,因此必須非常快。對於同步調用,在單獨的線程中執行可以嗎?這將導致線程的額外成本和資源以及線程的上下文切換成本,所以我有點混淆。此外,我在這裏使用ForkJoinPool這將節省我使用額外的線程池,但它是在這裏正確的選擇?

有沒有更好的和有效的方式來做同樣的事情,也可以提高性能?我使用的是Java 7,並且可以訪問Guava庫,所以如果它可以簡化任何事情,那麼我也可以開放它。

看起來我們在負載很重時會看到一些爭用。有沒有什麼辦法可以使這個代碼在非常重的負載下運行時進入線程爭用?

+0

聽起來像[ThreadPool](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html)會很有用,但請記住,過早優化是所有邪惡的來源 –

+0

@ScaryWombat同意,這就是爲什麼我會做負載測試,但問題是我使用ForkJoinPool這也是ThreadPool的專用形式是合理的。然後,我使用executeSync方法的方式是否正確? – john

+0

你看到了什麼樣的爭用?也許是重負載新的ForkJoinPool(16);'是不夠的,嘗試增加'16'到一個更大的值 – Teg

回答

0

我認爲在您的情況下,最好使用異步http調用,請參閱鏈接:HttpAsyncClient。而且你不需要使用線程池。

在executeAsync方法創建空CompletableFuture <DataResponse>(),並把它傳遞給客戶打電話,在回撥電話有通過調用完成它(或completeExceptionally如果異常募集)設置completableFuture的結果。 ExecuteSync方法實現看起來不錯。

編輯:

對於Java 7它是隻需要更換一個completableFuture承諾番石榴或實施,像ListenableFuture任何類似

0

使用ForkJoinPool是正確的選擇,其設計效率與許多小任務:

一個ForkJoinPool不同於其他類型的ExecutorService主要依靠僱用偷工減料:池中的所有線程試圖找到一個d執行提交給池的任務和/或由其他活動任務創建的任務(如果不存在,最終會阻止等待工作)。這可以在大多數任務產生其他子任務時(如同大多數ForkJoinTasks一樣)以及從外部客戶端將許多小任務提交給池時實現高效處理。特別是在構造函數中將asyncMode設置爲true時,ForkJoinPools可能適合用於永不連接的事件樣式任務。

我建議嘗試在構造函數中asyncMode = true因爲你的情況的任務從未加入:

public class DataClient implements Client { 
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(16, ForkJoinPool.ForkJoinWorkerThreadFactory, null, true); 
... 
} 

對於executeSync()可以使用forkJoinPool.invoke(task),這是做了同步的管理方式在游泳池任務執行的資源優化:

@Override 
public List<DataResponse> executeSync(DataRequest key) { 
    DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder); 
    return this.forkJoinPool.invoke(task); 
} 

如果你可以使用Java 8再有就是已經優化公共池:ForkJoinPool.commonPool()

+0

你能給我一個例子,我應該使用'asyncMode = true'嗎?另外我的'executeSync()'方法是怎麼樣的?對此有點混淆。 – john

+0

我在回答中添加了一些示例 – Teg

+0

因此,這意味着我不需要在'executeSync'內調用'executeAsync'方法並按照您的建議進行操作。如果是,那麼超時現在如何進入畫面?我的意思是同步如果調用超時,然後我返回超時響應。這將如何工作? – john

相關問題