2017-03-06 79 views
0

背景:使用Vertx 3.3.3核心網絡在Java端作爲服務器,使用vertx的web-3.3.3-client.js作爲與客戶sockjsclient1.1.2.jsVertx JS Eventbus連接,然後關閉

問題:我成功地實現了從客戶端我的本地計算機或局域網上時將eventbus的連接。當我通過代理時,wss eventbus連接被阻止(在Firefox中,我看到「Firefox無法建立到」wss:// ...「的連接;在Chromium中,我看到」WebSocket連接到wss :// ...失敗:在WebSocket握手期間出錯:意外的響應代碼:400「,然後我看到」https://.../eventbus/.../xhr_send?t = ...加載失敗資源:服務器響應的狀態代碼爲500「),但是,我會收到一些數據(連接降級爲可接受的協議?)。緊接着,onclose發生火災,並且我失去了連接。我知道我正在成功地到達Java vertx服務器,因爲我的靜態web和API調用正在工作。

問題:我已經深入地瀏覽了Vertx和SockJS文檔。是否存在:

  1. 有關vertx如何在JS Eventbus連接中嘗試不同傳輸協議的文檔?
  2. 通過業務代理工作的JS Vertx Eventbus的例子?
  3. 實現事件總線連接的另一種方式,可能是指定SockJS協議來嘗試/使用? (我試圖用最簡單的方法創建一個事件總線連接,如文檔中的許多地方所示)
  4. 我需要在SockJS/Eventbus設置的Java端做些什麼?

在此先感謝您的任何建議/幫助!

編輯1:爲Java服務器和JavaScript Web客戶端添加以下代碼。網絡方面是非常基本的(什麼是失敗)。 Java方面使用Spring進行依賴注入和應用程序配置,具有Eventbus連接,一個API調用以及靜態Web內容。

從客戶端到服務器的API調用工作正常,並且服務器正確地獲取Web內容,因此訪問該工具正在工作。然而,代理導致WSS失敗(如預期),但降級到XHR的流失敗(我認爲)

的Javascript:

var EB; 
var URL; 
var APICall = "api/eventbus/publish/"; 
var IncomingAddress = "heartbeat-test"; 
var OutgoingAddress = "client-test"; 

function createConnection(){ 
    URL = $("#serveraddressinput").val(); //Getting url from html text box 
    console.log("Creating Eventbus connection at " + URL + "eventbus"); //Eventbus address is '<link>/eventbus' 
    EB = new EventBus(URL + "eventbus"); 

    testAPICall(); 

    EB.onopen = function(){ 
     console.log("Eventbus connection successfully made at " + URL + "eventbus"); 
     console.log("Registering Eventbus handler for messages at " + IncomingAddress); 

     EB.registerHandler(IncomingAddress, function(error, message){ 
      console.log("Received Eventbus message " + JSON.stringify(message)); 
    }; 

    EB.onclose = function(){ 
     console.log("Eventbus connection at " + URL + " has been lost"); 
     URL = ""; 
    }; 
} 

function testAPICall(){ 
    var link = URL + APICall + "heartbeat-test"; 
    console.log("Testing API call to " + link); 
    $.ajax({ 
     url: link, 
     type: 'POST', 
     data: JSON.stringify({"testFromClient": "Test message sent from Client via API Call"}), 
     dataType: 'json', 
     success: function (data, textStatus) { 
      console.log("API Call Success: " + JSON.stringify(data)); 
     }, 
     error: function (request, error) { 
      console.log("API Call ERROR: " + JSON.stringify(request) + " " + error); 
     } 
    }); 
} 

function sendTestMessage(){ 
    console.log("Sending test message to address " + OutgoingAddress); 
    EB.send(OutgoingAddress, "Testing 1, 2, 3..."); 
} 

的Java:

... 

import io.vertx.core.AbstractVerticle; 
import io.vertx.core.Future; 
import io.vertx.core.eventbus.EventBus; 
import io.vertx.core.http.HttpMethod; 
import io.vertx.core.http.HttpServerOptions; 
import io.vertx.core.json.Json; 
import io.vertx.core.json.JsonObject; 
import io.vertx.core.net.JksOptions; 
import io.vertx.ext.web.Router; 
import io.vertx.ext.web.RoutingContext; 
import io.vertx.ext.web.handler.BodyHandler; 
import io.vertx.ext.web.handler.CorsHandler; 
import io.vertx.ext.web.handler.StaticHandler; 
import io.vertx.ext.web.handler.sockjs.BridgeEvent; 
import io.vertx.ext.web.handler.sockjs.BridgeEventType; 
import io.vertx.ext.web.handler.sockjs.BridgeOptions; 
import io.vertx.ext.web.handler.sockjs.PermittedOptions; 
import io.vertx.ext.web.handler.sockjs.SockJSHandler; 
import org.apache.logging.log4j.Level; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.slf4j.Marker; 
import org.slf4j.MarkerFactory; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.stereotype.Service; 

@Service 
public class MyTestVerticle extends AbstractVerticle { 

    private final static Logger log = LoggerFactory.getLogger(MyTestVerticle.class); 

    final Level ACCESS = Level.forName("ACCESS", 450); 

    private boolean started; 

    private int port; 

    @Value("${webserver.testpath.enabled}") 
    private boolean testPathEnabled; 

    @Value("${webserver.urlpath.test}") 
    private String testUrlPath; 

    @Value("${webserver.filepath.test}") 
    private String testFilePath; 

    @Value("${webserver.caching.enabled}") 
    private boolean cachingEnabled; 

    @Value("${webserver.ssl.enabled}") 
    private boolean sslEnabled; 

    private BridgeOptions bridgeOptions; 

    private SockJSHandler sockJsHandler; 

    private Router router; 

    private JksOptions sslKeyStoreOptions; 

    private JksOptions sslTrustStoreOptions; 

    public MyTestVerticle() { 
     this.started = false; 
    } 

    @Override 
    public void start(Future<Void> fut) throws Exception { 
     log.info("start() -- starting Vertx Verticle with eventbus, API handler, and static file handler"); 

     // grab the router 
     router = getRouter(); 

     // enable CORS for the router 
     CorsHandler corsHandler = CorsHandler.create("*"); //Wildcard(*) not allowed if allowCredentials is true 
     corsHandler.allowedMethod(HttpMethod.OPTIONS); 
     corsHandler.allowedMethod(HttpMethod.GET); 
     corsHandler.allowedMethod(HttpMethod.POST); 
     corsHandler.allowedMethod(HttpMethod.PUT); 
     corsHandler.allowedMethod(HttpMethod.DELETE); 
     corsHandler.allowCredentials(false); 
     corsHandler.allowedHeader("Access-Control-Request-Method"); 
     corsHandler.allowedHeader("Access-Control-Allow-Method"); 
     corsHandler.allowedHeader("Access-Control-Allow-Credentials"); 
     corsHandler.allowedHeader("Access-Control-Allow-Origin"); 
     corsHandler.allowedHeader("Access-Control-Allow-Headers"); 
     corsHandler.allowedHeader("Content-Type"); 

     // enable handling of body 
     router.route().handler(BodyHandler.create()); 
     router.route().handler(corsHandler); 
     router.route().handler(this::handleAccessLogging); 

     // publish a payload to provided eventbus destination 
     router.post("/api/eventbus/publish/:destination").handler(this::publish); 

     // open up all for outbound and inbound traffic 
     bridgeOptions = new BridgeOptions(); 
     bridgeOptions.addOutboundPermitted(new PermittedOptions().setAddressRegex(".*")); 
     bridgeOptions.addInboundPermitted(new PermittedOptions().setAddressRegex(".*")); 
//  sockJsHandler = SockJSHandler.create(vertx).bridge(bridgeOptions); 
     sockJsHandler = SockJSHandler.create(vertx); 
     sockJsHandler.bridge(bridgeOptions, be -> { 
      try { 
       if (be.type() == BridgeEventType.SOCKET_CREATED) { 
        handleSocketOpenEvent(be); 
       } 
       else if(be.type() ==BridgeEventType.REGISTER) { 
        handleRegisterEvent(be); 
       } 
       else if(be.type() ==BridgeEventType.UNREGISTER) { 
        handleUnregisterEvent(be); 
       } 
       else if(be.type() ==BridgeEventType.SOCKET_CLOSED) { 
        handleSocketCloseEvent(be); 
       } 
      } catch (Exception e) { 

      } finally { 
       be.complete(true); 
      } 
     }); 
     router.route("/eventbus/*").handler(sockJsHandler); 

     if(testPathEnabled){ 
      router.route("/" + testUrlPath + "/*").handler(StaticHandler.create(testFilePath).setCachingEnabled(cachingEnabled)); 
     } 

     // create periodic task, pushing all current EventBusRegistrations 
     vertx.setPeriodic(1000, handler -> { 
      JsonObject obj =new JsonObject(); 
      obj.put("testMessage", "Periodic test message from server..."); 
      vertx.eventBus().publish("heartbeat-test", Json.encodePrettily(obj)); 
     }); 

     EventBus eb = vertx.eventBus(); 
     eb.consumer("client-test", message -> { 
      log.info("Received message from client: " + Json.encodePrettily(message.body()) + " at " + System.currentTimeMillis()); 
     }); 

     HttpServerOptions httpOptions = new HttpServerOptions(); 
     if(sslEnabled){ 
       httpOptions.setSsl(true); 
       httpOptions.setKeyStoreOptions(sslKeyStoreOptions); 
     } 

     log.info("starting web server on port: " + port); 
     vertx 
       .createHttpServer(httpOptions) 
       .requestHandler(router::accept).listen(
       port, 
       result -> { 
        if (result.succeeded()) { 
         setStarted(true); 
         log.info("Server started and ready to accept requests"); 
         fut.complete(); 
        } else { 
         setStarted(false); 
         fut.fail(result.cause()); 
        } 
       } 
     ); 
    } 

    private void handleSocketOpenEvent(BridgeEvent be){ 
     String host =be.socket().remoteAddress().toString(); 
     String localAddress = be.socket().localAddress().toString(); 
     log.info("Socket connection opened! Host: " + host + " Local address: " + localAddress); 
    } 

    private void handleRegisterEvent(BridgeEvent be){ 
     String host =be.socket().remoteAddress().toString(); 
     String localAddress = be.socket().localAddress().toString(); 
     String address = be.getRawMessage().getString("address").trim(); 
     log.info("Eventbus register event! Address: " + address + " Host: " + host + " Local address: " + localAddress); 
    } 

    private void handleUnregisterEvent(BridgeEvent be){ 
     String host =be.socket().remoteAddress().toString(); 
     String localAddress = be.socket().localAddress().toString(); 
     String address = be.getRawMessage().getString("address").trim(); 
     log.info("Eventbus unregister event! Address: " + address + " Host: " + host + " Local address: " + localAddress); 
    } 

    private void handleSocketCloseEvent(BridgeEvent be){ 
     String host =be.socket().remoteAddress().toString(); 
     String localAddress = be.socket().localAddress().toString(); 
     log.info("Socket connection closed! Host: " + host + " Local address: " + localAddress); 
    } 

    //Method handles logging at custom level for access logging to files 
    private void handleAccessLogging(RoutingContext routingContext){ 
     Marker accessMarker = MarkerFactory.getMarker("ACCESS"); 

     if(routingContext.normalisedPath().contains("/api")){ 
      log.info(accessMarker, "Api access log: request= " + routingContext.normalisedPath() + " source=" + routingContext.request().remoteAddress()); 
     } 
     else{ 
      log.info(accessMarker, "Web access log: path= " + routingContext.normalisedPath() + " source= " + routingContext.request().remoteAddress()); 
     } 

     routingContext.next(); 
    } 

    /** 
    * Accept a payload (anything) and publish to the provided destination 
    * 
    * @param routingContext 
    */ 
    private void publish(RoutingContext routingContext) { 
     String destination = routingContext.request().getParam("destination"); 
     String payload = routingContext.getBodyAsString(); 
     if ((destination == null) || (payload == null)) { 
      Exception e = new Exception("Missing arguments"); 
      routingContext.response().setStatusCode(406); 
      routingContext.fail(e); 
     } else { 
      log.info("API Call -> Publishing to destination: " + destination + " payload: " + payload); 
      vertx.eventBus().publish(destination, payload); 
      routingContext 
        .response() 
        .setStatusCode(200) 
        .putHeader("content-type", "application/json; charset=utf-8") 
        .end(payload); 
     } 
    } 

    public boolean isStarted() { 
     return started; 
    } 

    public void setStarted(boolean started) { 
     this.started = started; 
    } 

    public int getPort() { 
     return port; 
    } 

    public void setPort(int port) { 
     this.port = port; 
    } 

    public Router getRouter(){ 
     if(router == null){ 
      router = Router.router(vertx); 
     } 
     return router; 
    } 

    public void setRouter(Router router){ 
     this.router = router; 
    } 

    public void setSslOptions(JksOptions keyStoreOptions, JksOptions trustStoreOptions) { 
     this.sslKeyStoreOptions = keyStoreOptions; 
     this.sslTrustStoreOptions = trustStoreOptions; 
    } 
} 
+0

能否請您發佈連接到的WebSocket/EventBus你的JS代碼? –

+0

我正在測試一個簡單的示例項目,很快就會用代碼進行更新。 – wrslatz

+0

@AlexeySoshin添加了一些代碼,讓我知道你是否需要其他東西 – wrslatz

回答

1

此錯誤可以通過執行以下操作來解決:

  1. 在Java Verticle中,將Eventbus處理程序移動到t在任何其他處理程序之前。我相信BodyHandler或CorsHandler搞砸了,並導致500錯誤。

    ... 
    router.route("/eventbus/*").handler(sockJsHandler); 
    ... 
    // enable handling of body 
    router.route().handler(BodyHandler.create()); 
    router.route().handler(corsHandler); 
    router.route().handler(this::handleAccessLogging); 
    
    // publish a payload to provided eventbus destination 
    router.post("/api/eventbus/publish/:destination").handler(this::publish);