我正在使用spring-cloud-starter-stream-kafka使用spring雲流。我已綁定我的渠道,卡夫卡主題作爲application.properties如下:Spring Cloud Stream @ServiceActivator不會將消息發送到errorChannel異常
spring.cloud.stream.bindings.gatewayOutput.destination=received
spring.cloud.stream.bindings.enrichingInput.destination=received
spring.cloud.stream.bindings.enrichingOutput.destination=enriched
spring.cloud.stream.bindings.redeemingInput.destination=enriched
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed
spring.cloud.stream.bindings.error.destination=errors12
spring.cloud.stream.bindings.errorInput.destination=errors12
spring.cloud.stream.bindings.errorOutput.destination=errors12
我不能讓我的程序產生異常信息的錯誤通道。令人驚訝的是,它甚至似乎沒有嘗試生成它,即使我在不同的線程(我有一個@MessagingGateway將信息轉儲到gatewayOutput,然後其餘的流程異步發生)。這裏是我的ServiceActivator的定義:
@Named
@Configuration
@EnableBinding(Channels.class)
@EnableIntegration
public class FulfillingServiceImpl extends AbstractBaseService implements
FulfillingService {
@Override
@Audit(value = "annotatedEvent")
@ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false")
public void fulfill(TrivialRedemption redemption) throws Exception {
logger.error("FULFILLED!!!!!!");
throw new Exception("test exception");
}
}
這裏是生成的日誌(我已經截斷了全除外)。有沒有...
- 投訴約errorChannel沒有任何用戶
- 卡夫卡生產者線程記錄
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud[email protected]2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.spr[email protected]64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [[email protected][endpoints=[[email protected]],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {} 2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {} 2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {} ... ... 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
編輯:這是我的渠道類的內容:
公共接口通道{
public static final String GATEWAY_OUTPUT = "gatewayOutput";
public static final String ENRICHING_INPUT = "enrichingInput";
public static final String ENRICHING_OUTPUT = "enrichingOutput";
public static final String REDEEMING_INPUT = "redeemingInput";
public static final String REDEEMING_OUTPUT = "redeemingOutput";
public static final String FULFILLING_INPUT = "fulfillingInput";
public static final String FULFILLING_OUTPUT = "fulfillingOutput";
@Output(GATEWAY_OUTPUT)
MessageChannel gatewayOutput();
@Input(ENRICHING_INPUT)
MessageChannel enrichingInput();
@Output(ENRICHING_OUTPUT)
MessageChannel enrichingOutput();
@Input(REDEEMING_INPUT)
MessageChannel redeemingInput();
@Output(REDEEMING_OUTPUT)
MessageChannel redeemingOutput();
@Input(FULFILLING_INPUT)
MessageChannel fulfillingInput();
@Output(FULFILLING_OUTPUT)
MessageChannel fulfillingOutput();
此外,請按照https://github.com/spring-cloud/spring-cloud-stream/issues/538上的進度瞭解Spring Cloud Stream中的未來解決方案。 –
感謝您的全面回答,我仍在消化它。我已經添加了我的Channels.class,並且喜歡採用基於建議的方法的建議,因爲我也對狀態重試行爲感興趣。我對你的答案的一些觀點有點困惑。您提到[活頁夾可以配置爲重試並將例外發送到死信主題;看到這個PR在1.0.0.RELEASE中。]。我正在使用1.0.0.RC2,這似乎早於公關。 1.0.0.RELEASE何時發佈? –
[上週發佈](https://spring.io/blog/2016/05/10/spring-cloud-stream-1-0-0-release-is-available)。 –