2017-08-17 124 views
1

我使用Spring集成的DSL實現。 我有下面的代碼,我不能使用我的自定義錯誤流。當authenticate方法拋出運行時異常時,errorChannel開始處理。我豐富頭文件來使用我的自定義錯誤流,但不能使用。彈簧集成DSL自定義錯誤通道不起作用

// In Class - 1 
@Bean 
    public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) { 

     MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway(); 
     wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input")); 
     wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input")); 
     wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorChannel")); 
     wsInboundGateway.setMarshaller(marshaller); 
     wsInboundGateway.setUnmarshaller(marshaller); 
     return wsInboundGateway; 
    } 


// In Class - 2 
@Bean 
    public IntegrationFlow incomingRequest() { 
     return f -> f.<Object, Class<?>>route(t -> t.getClass(), 
       mapping -> mapping.subFlowMapping(payloadType1(), 
         sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional)) 
         .subFlowMapping(payloadType2(), 
           sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)), 
         conf -> conf.id("router:Incoming request router")); 
    } 

// In Class - 3 
    @Bean 
    public IntegrationFlow type1() { 
     IntegrationFlow integrationFlow = f -> f 
       .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "error222", true)) 
       .<Type1>handle((p, h) -> authentication.authenticate(p), 
         conf -> conf.id("service-activator:Authenticate")) 
       .transform(transformer::transformType1MsgToDataX, 
         conf -> conf.id("transform:Unmarshall type1 Message")) 
       .enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id") 
         .headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType")) 
       .handle((GenericHandler<DataX>) repository::successResponseMessage, 
         conf -> conf.id("service-activator:return success")) 
       .channel("outgoingResponse.input") 
       ; 

     return integrationFlow; 
    } 

// In Class - 3 
@Bean 
    public IntegrationFlow error222Flow() { 

     return IntegrationFlows.from("error222").handle("repository", "failureResponseMessage").get() 

       ; 

    } 

編輯:

阿爾喬姆的答案後,我的代碼如下圖所示。但是,我仍然無法訪問錯誤流中的標題參數。我得到的錯誤 - 「沒有通道通過路由器解決‘路由器:錯誤響應準備’」

// In Class - 1 
@Bean 
    public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) { 

     MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway(); 
     wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input")); 
     wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input")); 
     wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorResponse.input")); 
     wsInboundGateway.setMarshaller(marshaller); 
     wsInboundGateway.setUnmarshaller(marshaller); 
     return wsInboundGateway; 
    } 


// In Class - 2 
@Bean 
    public IntegrationFlow incomingRequest() { 
     return f -> f.<Object, Class<?>>route(t -> t.getClass(), 
       mapping -> mapping.subFlowMapping(payloadType1(), 
         sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional)) 
         .subFlowMapping(payloadType2(), 
           sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)), 
         conf -> conf.id("router:Incoming request router")); 
    } 

// In Class - 2 
@Bean 
public IntegrationFlow errorResponse(){ 
    return f -> f.<MessageHandlingException, Object>route(t -> t.getFailedMessage().getHeaders().get("ABCDEF"), 
         mapping -> mapping.subFlowMapping("ABCDEF", 
           sf -> sf.gateway("customError.input", ConsumerEndpointSpec::transactional)), 
           conf -> conf.id("router:error response prepare")); 
} 

// In Class - 3 
    @Bean 
    public IntegrationFlow type1() { 
     IntegrationFlow integrationFlow = f -> f 
       .enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true)) 
       .<Type1>handle((p, h) -> authentication.authenticate(p), 
         conf -> conf.id("service-activator:Authenticate")) 
       .transform(transformer::transformType1MsgToDataX, 
         conf -> conf.id("transform:Unmarshall type1 Message")) 
       .enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id") 
         .headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType")) 
       .handle((GenericHandler<DataX>) repository::successResponseMessage, 
         conf -> conf.id("service-activator:return success")) 
       .channel("outgoingResponse.input") 
       ; 

     return integrationFlow; 
    } 

// In Class - 3 
@Bean 
    public IntegrationFlow customError(){ 
     return f -> f.handle((GenericHandler<MessageHandlingException>)eventRepository::failureResponseMessage, 
           conf -> conf.id("service-activator:return failure")); 
    } 

編輯 - 2:

我嘗試阿爾喬姆的測試代碼,它工作在這種情況下。如果我將type1流轉換爲子流映射如下(我這樣做,因爲我懷疑我的子流代碼塊),錯誤流不能打印ABCDEF參數值。 之後,我將另一個標題(XYZTWR)添加到子流映射中,但不能打印。

@Bean 
public IntegrationFlow type1() { 
    return f -> f.<String, String>route(t -> t.toString(), mapping -> mapping.subFlowMapping("foo", 
      sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional).enrichHeaders(h -> h.header("XYZTRW", "XYZTRW", true)))); 
} 

@Bean 
public IntegrationFlow fooFlow() { 
    return f -> f.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true)) 
      .handle((p, h) -> { 
       throw new RuntimeException("intentional"); 
      }); 
} 

我S.OUT是:

GenericMessage [payload=foo, headers={history=testGateway,type1.input, id=1fad7a65-4abe-c41d-0b22-36839a103269, timestamp=1503029553071}] 
+0

我用spring集成4.3.11和spring整合java dsl 1.2.2。 – user2286211

回答

0

errorChannel頭開始工作時,我們轉移消息發送給不同的線程執行人或隊列信道。否則標準throwtry...catch在相同的調用堆棧中工作。

因此,在您的情況下,認證異常只是引發到調用者 - WS入站網關。在這裏你已經配置了全局錯誤通道。

我做了這個測試:

@Configuration 
@EnableIntegration 
@IntegrationComponentScan 
public static class ContextConfiguration { 

    @Bean 
    public IntegrationFlow errorResponse() { 
     return IntegrationFlows.from(errorChannel()) 
        .<MessagingException, Message<?>>transform(MessagingException::getFailedMessage, 
          e -> e.poller(p -> p.fixedDelay(100))) 
        .get(); 
    } 

    @Bean 
    public IntegrationFlow type1() { 
      return f -> f 
        .enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true)) 
        .handle((p, h) -> { throw new RuntimeException("intentional"); }); 
    } 

    @Bean 
    public PollableChannel errorChannel() { 
     return new QueueChannel(); 
    } 
} 

@MessagingGateway(errorChannel = "errorChannel", defaultRequestChannel = "type1.input") 
public interface TestGateway { 

    Message<?> sendTest(String payload); 

} 

... 

@Autowired 
private TestGateway testGateway; 

@Test 
public void testErrorChannel() { 
    Message<?> message = this.testGateway.sendTest("foo"); 
    System.out.println(message); 
} 

而且我SOUT顯示我:

GenericMessage [payload=foo, headers={ABCDEF=ABCDEF, id=ae5d2d44-46b7-912d-17d4-bf2ee656140a, timestamp=1502999446725}] 

請,請爲org.springframework.integration類別DEBUG日誌記錄級別,並觀察在該步驟中你的消息是失去希望的頭。

UPDATE

確定。我看到你的問題。因爲您使用的是sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional),換句話說,您通過網關呼叫下游,您所做的所有事情都在門後,並且只有在發送錯誤的情況下才能返回,這是網關的請求消息。下游failedMessage被默認吞下。

要解決這個問題,您應該考慮爲該.gateway()增加一個errorChannel()選項,並處理那裏的下游錯誤。或者...只是不要在路由器的子流程中使用.gateway(),而是簡單地使用channel映射。

.transactional()也可以配置在任何.handle()

+0

謝謝。我明白,但我的另一個問題是,我不能傳遞一個參數(如標題地圖值)到全局錯誤通道來決定我的自定義配置。 – user2286211

+0

您可以改爲在「MarshallingWebServiceInboundGateway」級別上使用自定義錯誤頻道。 –

+0

如果我可以將頭參數傳遞給全局錯誤通道,它可以幫助我,但我不能。有沒有辦法呢?我需要啓動流名稱信息來決定全局錯誤通道的過程。 – user2286211