2017-07-24 72 views
0

我試圖找到一種異步方式立即返回客戶端請求響應。如何立即發送請求異步響應

我只需要記錄請求數據,調用新線程來請求其他服務器上的昂貴操作(一些後端操作) 並且無需等待來自它們的響應立即返回200狀態響應到客戶端。

在這一刻,我試圖用CompletableFuture做到這一點,但我錯過了一些東西。

package com.example.controller; 

import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.core.Logger; 

import javax.servlet.http.HttpServletRequest; 
import javax.ws.rs.Consumes; 
import javax.ws.rs.GET; 
import javax.ws.rs.Path; 
import javax.ws.rs.core.Context; 
import javax.ws.rs.core.MediaType; 
import javax.ws.rs.core.Response; 
import java.util.concurrent.CompletableFuture; 

@Path("/class") 
public class AsynchronousResponse { 

private static final Logger LOGGER = (Logger) LogManager.getLogger(AsynchronousResponse.class.getName()); 
private static final int HTTP_STATUS_OK = 200; 
private static final String EMPTY_CONTENT = ""; 

@Context 
private HttpServletRequest httpRequest; 

@Path("/method") 
@GET 
@Consumes(MediaType.APPLICATION_JSON) 
public Response asyncResponseSupply() { 
    LOGGER.info(httpRequest.getSession().getId() + " : New session received"); 
    CompletableFuture.supplyAsync(this::veryExpensiveOperations); 
    LOGGER.info(httpRequest.getSession().getId() + " : Returning empty response... "); 
    return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build(); 
} 

// need to do some operations on data from httpRequest 
private String veryExpensiveOperations() { 
    LOGGER.info(httpRequest.toString()); 
    LOGGER.info("Start very expensive operations"); 
    try { 
     Thread.sleep(3000); 
     LOGGER.info("Finished"); 
     return "DONE"; 
    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
     e.printStackTrace(); 
     LOGGER.error("Error: " + e.getMessage()); 
     return "ERROR"; 
    } 
} 

} 

所有的一切我得到立即迴應,但veryExpensiveOperations()方法似乎失去了HttpRequest的值,它是太糟糕了,我becouse我需要調用其他Web服務從客戶端請求的值。感謝幫助!

對於我的應用程序,我使用Jetty版本。 9.2.18.v20160721

回答

2

一旦完成向servlet的分派,響應將被提交,並且請求和響應對象將被回收。

對於servlet規範的處理,您必須使用AsyncContext(通過調用HttpServletRequest.startAsync()獲取)讓容器知道請求/響應尚未完成,並且處理它是在非調度線程中發生(如你的CompletableFuture.supplyAsync()電話)

至於如何混合JAX-RS和AsyncContext,我不知道。我甚至不知道JAX-RS是否已經更新以支持AsyncContext

0

感謝Joakim你的回答(我無法贊成它,因爲我在這裏很新,而且我沒有足夠的聲望)。根據你對於AsyncContext的理解,我發現了一種發送快速響應的方法。我的代碼更改後(對於runAsync和supplyAsync均正常工作):

package com.example.controller; 

import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.core.Logger; 

import javax.servlet.AsyncContext; 
import javax.servlet.http.HttpServletRequest; 
import javax.ws.rs.Consumes; 
import javax.ws.rs.GET; 
import javax.ws.rs.Path; 
import javax.ws.rs.core.Context; 
import javax.ws.rs.core.MediaType; 
import javax.ws.rs.core.Response; 
import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.ForkJoinPool; 


@Path("/test") 
public class AsynchronousResponse { 

private static final Logger LOGGER = (Logger) LogManager.getLogger(AsynchronousResponse.class.getName()); 
private static final int HTTP_STATUS_OK = 200; 
private static final String EMPTY_CONTENT = ""; 

@Context 
private HttpServletRequest httpRequest; 

private AsyncContext asyncContext; 

@Path("/supply") 
@GET 
@Consumes(MediaType.APPLICATION_JSON) 
public Response asyncResponseSupply() { 
    String loggerPrefix = httpRequest.getSession().getId() + " :[SUPPLY] "; 
    LOGGER.info(loggerPrefix + "New session received"); 
    LOGGER.info(loggerPrefix + "Active threads on controller init: " + Thread.activeCount()); 
    asyncContext = httpRequest.startAsync(); 
    ForkJoinPool pool = new ForkJoinPool(
      Runtime.getRuntime().availableProcessors(), 
      ForkJoinPool.defaultForkJoinWorkerThreadFactory, 
      null, 
      true); 
    CompletableFuture.supplyAsync(this::veryExpensiveOperations, pool); 
    LOGGER.info(loggerPrefix + "Actual active Threads on controller return: " + Thread.activeCount()); 
    LOGGER.info(loggerPrefix + "Returning empty response... "); 
    return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build(); 
} 

@Path("/run") 
@GET 
@Consumes(MediaType.APPLICATION_JSON) 
public Response asyncResponseSupplyRun() { 
    String loggerPrefix = httpRequest.getSession().getId() + " :[RUN] "; 
    LOGGER.info(loggerPrefix + "New session received"); 
    LOGGER.info(loggerPrefix + "Active threads on controller init: " + Thread.activeCount()); 
    asyncContext = httpRequest.startAsync(); 
    ForkJoinPool pool = new ForkJoinPool(
      Runtime.getRuntime().availableProcessors(), 
      ForkJoinPool.defaultForkJoinWorkerThreadFactory, 
      null, 
      true); 
    CompletableFuture.runAsync(this::veryExpensiveOperations, pool); 
    LOGGER.info(loggerPrefix + "Actual active Threads on controller return: " + Thread.activeCount()); 
    LOGGER.info(loggerPrefix + "Returning empty response... "); 
    return Response.status(HTTP_STATUS_OK).entity(EMPTY_CONTENT).build(); 
} 

// need to do some operations on data from httpRequest 
private String veryExpensiveOperations() { 
    String loggerPrefix = httpRequest.getSession().getId() + " :[veryExpensiveOperations] "; 
    LOGGER.info(loggerPrefix + "Request toString: " + httpRequest.toString()); 
    LOGGER.info(loggerPrefix + "Start very expensive operations"); 

    try { 
     Thread.sleep(3000); 
     LOGGER.info(loggerPrefix + "Thread sleep finished"); 
     LOGGER.info(loggerPrefix + "Actual active Threads: " + Thread.activeCount()); 
     Thread.currentThread().interrupt(); 
     asyncContext.complete(); 
     return "DONE"; 

    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
     e.printStackTrace(); 
     LOGGER.error(loggerPrefix + "Error: " + e.getMessage()); 
     asyncContext.complete(); 
     return "ERROR"; 

    } 
} 

}