2016-08-24 116 views
1

我有一個需求,我需要在UNIX位置連續查找文件。一旦它可用,那麼我需要解析它並轉換爲一些json格式。這需要使用Spring集成完成 - DSL。 以下是一段代碼,我從春天網站得到,但它顯示以下異常:春季集成使用DSL從Unix位置讀取文件

o.s.integration.handler.LoggingHandler: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.processFileChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 

下面是代碼:

@SpringBootApplication 
public class FileReadingJavaApplication { 

    public static void main(String[] args) { 
     new SpringApplicationBuilder(FileReadingJavaApplication.class) 
      .web(false) 
      .run(args); 
    } 

    @Bean 
    public IntegrationFlow fileReadingFlow() { 
     return IntegrationFlows 
        .from(s -> s.file(new File("Y://")) 
           .patternFilter("*.txt"), 
          e -> e.poller(Pollers.fixedDelay(1000))) 
        .transform(Transformers.fileToString()) 
        .channel("processFileChannel") 
        .get(); 
     } 

} 

新代碼:

@SpringBootApplication 公共類SpringIntegration {

public static void main(String[] args) { 
    new SpringApplicationBuilder(SpringIntegration.class) 
    .web(false) 
    .run(args); 
} 

@Bean 
public SessionFactory<LsEntry> sftpSessionFactory() { 
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); 
    factory.setHost("ip"); 
    factory.setPort(port); 
    factory.setUser("username"); 
    factory.setPassword("pwd"); 
    factory.setAllowUnknownKeys(true); 
    return new CachingSessionFactory<LsEntry>(factory); 
} 

@Bean 
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { 
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); 
    fileSynchronizer.setDeleteRemoteFiles(false); 
    fileSynchronizer.setRemoteDirectory("remote dir"); 
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.txt")); 

    return fileSynchronizer; 
} 
@Bean 
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) 
public MessageSource ftpMessageSource() { 
    SftpInboundFileSynchronizingMessageSource source = 
      new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()); 
    source.setLocalFilter(new AcceptOnceFileListFilter<File>()); 
    source.setLocalDirectory(new File("Local directory")); 

    return source; 
} 

@Bean 
@ServiceActivator(inputChannel = "fileInputChannel") 
public MessageHandler handler() { 
    return new MessageHandler() { 


     @Override 
     public void handleMessage(Message<?> message) throws MessagingException { 
      System.out.println("File Name : "+message.getPayload()); 

     } 

    }; 
} 

@Be public static StandardIntegrationFlow processFileFlow(){ return IntegrationFlows .from(「fileInputChannel」)。split() .handle(「fileProcessor」,「process」)。get();

} 

@Bean 
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000")) 
public MessageSource<File> fileReadingMessageSource() { 
    AcceptOnceFileListFilter<File> filters =new AcceptOnceFileListFilter<>(); 

    FileReadingMessageSource source = new FileReadingMessageSource(); 
    source.setAutoCreateDirectory(true); 
    source.setDirectory(new File("Local directory")); 
    source.setFilter(filters); 

    return source; 
} 
@Bean 
public FileProcessor fileProcessor() { 
    return new FileProcessor(); 
} 


@Bean 
    @ServiceActivator(inputChannel = "fileInputChannel") 
    public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) { 
     AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); 
     outbound.setExpectReply(true); 
     outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' 
     return outbound; 
    } 



    @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") 
    public interface MyGateway { 
     String sendToRabbit(String data); 

    } 

}

FileProcessor:

公共類FileProcessor {

public void process(Message<String> msg) { 
    String content = msg.getPayload(); 
    JSONObject jsonObject ; 
    Map<String, String> dataMap = new HashMap<String, String>(); 
    for(int i=0;i<=content.length();i++){ 
    String userId = content.substring(i+5,i+16); 


    dataMap = new HashMap<String, String>(); 

    dataMap.put("username", username.trim()); 


    i+=290; //each record of size 290 in file 
    jsonObject = new JSONObject(dataMap); 
    System.out.println(jsonObject); 

    } 

} 

}

回答

0

你的代碼是正確的,但一個例外告訴你,有需要的東西是什麼將從直接通道「processFileChannel」中讀取消息。

請詳細瞭解Spring Integration Reference Manual中不同的通道類型。

編輯

一個在Spring集成一等公民是MessageChannel抽象。有關更多信息,請參閱EIP

定義如.channel("processFileChannel")表示聲明DirectChannel。這種渠道意味着在發送上接收消息,並在send過程中直接執行直接。在原始的Java語言中,它可能聽起來像:從另一個服務調用一個服務。如果另一個未被自動佈線,則投擲NPE

因此,如果您使用DirectChannel作爲輸出,則應聲明用戶爲。我不知道你的邏輯是什麼,但它是如何工作的,沒有別的選擇來修復Dispatcher has no subscribers for channel

儘管您可以使用其他MessageChannel類型。但爲此目的,您應該閱讀更多文檔,例如馬克費舍爾的Spring Integration in Action

+0

其實我是Spring Integration的新手。這個概念有點棘手,如果你能根據我的需求告訴我需要做的改變將會很有幫助。 – user

+0

您所做的只是將文件內容發送到「processFileChannel」 - 但您需要一些消耗該數據的內容 - 其他一些以該頻道開頭的流程。 –

+0

請在我的答案中找到一個'EDIT'。 –