2016-05-13 279 views
3

我正在使用spring-cloud-starter-stream-kafka使用spring雲流。我已綁定我的渠道,卡夫卡主題作爲application.properties如下:Spring Cloud Stream @ServiceActivator不會將消息發送到errorChannel異常

spring.cloud.stream.bindings.gatewayOutput.destination=received 
spring.cloud.stream.bindings.enrichingInput.destination=received 
spring.cloud.stream.bindings.enrichingOutput.destination=enriched 
spring.cloud.stream.bindings.redeemingInput.destination=enriched 
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed 
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed 
spring.cloud.stream.bindings.error.destination=errors12 
spring.cloud.stream.bindings.errorInput.destination=errors12 
spring.cloud.stream.bindings.errorOutput.destination=errors12 

我不能讓我的程序產生異常信息的錯誤通道。令人驚訝的是,它甚至似乎沒有嘗試生成它,即使我在不​​同的線程(我有一個@MessagingGateway將信息轉儲到gatewayOutput,然後其餘的流程異步發生)。這裏是我的ServiceActivator的定義:

@Named 
@Configuration 
@EnableBinding(Channels.class) 
@EnableIntegration 
public class FulfillingServiceImpl extends AbstractBaseService implements 
     FulfillingService { 

    @Override 
    @Audit(value = "annotatedEvent") 
    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false") 
    public void fulfill(TrivialRedemption redemption) throws Exception { 

     logger.error("FULFILLED!!!!!!"); 

     throw new Exception("test exception"); 

    } 
} 

這裏是生成的日誌(我已經截斷了全除外)。有沒有...

  • 投訴約errorChannel沒有任何用戶
  • 卡夫卡生產者線程記錄
 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud[email protected]2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.spr[email protected]64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {} 
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {} 
2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {} 
2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {} 
... 
... 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {} 
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 

編輯:這是我的渠道類的內容:

公共接口通道{

public static final String GATEWAY_OUTPUT = "gatewayOutput"; 

public static final String ENRICHING_INPUT = "enrichingInput"; 
public static final String ENRICHING_OUTPUT = "enrichingOutput"; 

public static final String REDEEMING_INPUT = "redeemingInput"; 
public static final String REDEEMING_OUTPUT = "redeemingOutput"; 

public static final String FULFILLING_INPUT = "fulfillingInput"; 
public static final String FULFILLING_OUTPUT = "fulfillingOutput"; 

@Output(GATEWAY_OUTPUT) 
MessageChannel gatewayOutput(); 

@Input(ENRICHING_INPUT) 
MessageChannel enrichingInput(); 

@Output(ENRICHING_OUTPUT) 
MessageChannel enrichingOutput(); 

@Input(REDEEMING_INPUT) 
MessageChannel redeemingInput(); 

@Output(REDEEMING_OUTPUT) 
MessageChannel redeemingOutput(); 

@Input(FULFILLING_INPUT) 
MessageChannel fulfillingInput(); 

@Output(FULFILLING_OUTPUT) 
MessageChannel fulfillingOutput(); 

回答

0

您不會顯示您的Channels類,但活頁夾不知道您的「錯誤」渠道是「特殊」的。

活頁夾可以配置爲重試並將例外發送到死信主題;請參閱1.0.0.RELEASE中的this PR

或者,也可以在服務激活之前增加一個「中間流」網關 - 把它像一個「try/catch語句」中的Java模塊:

@MessageEndpoint 
public static class GatewayInvoker { 

    @Autowired 
    private ErrorHandlingGateway gw; 

    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT) 
    public void send(Message<?> message) { 
     this.gw.send(message); 
    } 

} 

@Bean 
public GatewayInvoker gate() { 
    return new GatewayInvoker(); 
} 

@MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS) 
public interface ErrorHandlingGateway { 

    void send(Message<?> message); 

} 

更改您的服務激活的輸入通道toService

您必須將@IntegrationComponentScan添加到您的配置類中,以便框架可以檢測到@MessagingGateway接口併爲其構建代理。

編輯

只是向我提出另一種方法是在你的服務激活的意見鏈添加ExpressionEvaluatingAdvice

+0

此外,請按照https://github.com/spring-cloud/spring-cloud-stream/issues/538上的進度瞭解Spring Cloud Stream中的未來解決方案。 –

+0

感謝您的全面回答,我仍在消化它。我已經添加了我的Channels.class,並且喜歡採用基於建議的方法的建議,因爲我也對狀態重試行爲感興趣。我對你的答案的一些觀點有點困惑。您提到[活頁夾可以配置爲重試並將例外發送到死信主題;看到這個PR在1.0.0.RELEASE中。]。我正在使用1.0.0.RC2,這似乎早於公關。 1.0.0.RELEASE何時發佈? –

+0

[上週發佈](https://spring.io/blog/2016/05/10/spring-cloud-stream-1-0-0-release-is-available)。 –

相關問題