2016-03-01 58 views
4

我目前正在實施與阿卡-HTTP斷路器如下:斷路器Scala和阿卡-HTTP REST服務

def sendMail(entity: MyEntity): ToResponseMarshallable = { 

     Thread.sleep(5 * 1000) 
     validateEntity(entity).map[ToResponseMarshallable] { 
     case (body, subject) if !isEmpty(body, subject) => { 
      val mailResponse = sendMail(body, subject) 
      OK -> ProcessedEmailMessage(mailResponse) 
     } 
     case _ => 
      BadRequest -> s"error: for $entity".toJson 
     } 
    } catch { 
     case e: DeserializationException => HttpResponse(BadRequest).withEntity(HttpEntity(s"error:${e.msg}").withContentType(ContentTypes.`application/json`)) 
    } 
    } 

    val maxFailures: Int = 2 
    val callTimeout: FiniteDuration = 1 second 
    val resetTimeout: FiniteDuration = 30 seconds 

    def open: Unit = { 
    logger.info("Circuit Breaker is open") 
    } 

    def close: Unit = { 
    logger.info("Circuit Breaker is closed") 
    } 

    def halfopen: Unit = { 
    logger.info("Circuit Breaker is half-open, next message goes through") 

    private lazy val breaker = CircuitBreaker(
    system.scheduler, 
    maxFailures, 
    callTimeout, 
    resetTimeout 
).onOpen(open).onClose(close).onHalfOpen(halfopen) 

    def routes: Route = { 
    logRequestResult("email-service_aggregator_email") { 
     pathPrefix("v1") { 
     path("sendmail") { 
      post { 
      entity(as[EmailMessage]) { entity => 
       complete { 
       breaker.withCircuitBreaker(Future(sendMail(entity))) 
       } 
      } 
      } 
     } 
     } 
    } 
    } 

我的問題是,如果我使用breaker.withCircuitBreaker(Future(sendMail(entity)))斷路器進入開放狀態但其餘的響應返回There was an internal server error作爲響應

相反,如果我用breaker.withSyncCircuitBreaker(Future(sendMail(entity)))然後斷路器永不熄滅處於打開狀態,但它返回預期HttpResponse

如何解決此問題以觸發斷路器並返回正確的HTTP響應?

+0

也可以你發佈產生'斷路器'的代碼? –

+0

當然,我編輯 –

+0

你需要使用'onComplete'而不是'complete'。 'complete'指令要求響應立即準備就緒。在你的情況下,'withCircuitBreaker'返回'Future',所以'complete'將不是一個有效的選項。 'onComplete'指令設置爲與'Future'一起工作,所以在這裏更合適。然後在'onComplete'回調中你可以使用'complete'。 – cmbaxter

回答

1
entity(as[EmailMessage]) { entity => ctx => 
    val withBreaker = breaker.withCircuitBreaker(Future(sendMail(entity))) 
    val withErrorHandling = withBreaker.recover { 
     case _: CircuitBreakerOpenException => 
     HttpResponse(TooManyRequests).withEntity("Server Busy") 
    } 
    ctx.complete(withErrorHandling) 
} 
+0

它不工作,我得到相同的「有一個內部服務器錯誤」的消息。我試圖做一些調整,sendMail()方法直接返回一個未來,如果我沒有將它包含在斷路器中,它將起作用,未來就解決了。 –

+0

當您的呼叫超時時間爲1秒時,發送郵件至少需要5秒。這不會造成異常嗎? –

+0

它應該進入打開狀態,發生但沒有響應返回 –

1

我會提供了另一種不可能性的解決方案,因爲我相信onComplete是一個Future的結果,完成路由時打交道時要走的模式慣用方式:

entity(as[EmailMessage]) { entity => 
    val withBreaker = breaker.withCircuitBreaker(Future(sendMail(entity))) 

    onComplete(withBreaker){ 
    case Success(trm) => 
     complete(trm) 

    //Circuit breaker opened handling 
    case Failure(ex:CircuitBreakerOpenException) => 
     complete(HttpResponse(TooManyRequests).withEntity("Server Busy")) 

    //General exception handling 
    case Failure(ex) => 
     complete(InternalServerError) 
    } 
}