2017-07-24 66 views
1

我有以下的SFTP文件同步:SftpInboundFileSynchronizer不同步

@Bean 
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { 
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); 
    fileSynchronizer.setDeleteRemoteFiles(false); 
    fileSynchronizer.setRemoteDirectory(applicationProperties.getSftpDirectory()); 
    CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<ChannelSftp.LsEntry>(); 
    compositeFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(store, "sftp")); 
    compositeFileListFilter.addFilter(new SftpSimplePatternFileListFilter(applicationProperties.getLoadFileNamePattern())); 
    fileSynchronizer.setFilter(compositeFileListFilter); 
    fileSynchronizer.setPreserveTimestamp(true); 
    return fileSynchronizer; 
} 

當應用程序第一次運行時,它同步到與遠程SFTP站點目錄的本地目錄。但是,它無法獲取遠程SFTP目錄文件中的任何後續更改。

它定爲輪詢如下:

@Bean 
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata")) 
public SftpInboundFileSynchronizingMessageSource sftpMessageSource() { 
    SftpInboundFileSynchronizingMessageSource source = 
      new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()); 
    source.setLocalDirectory(applicationProperties.getScheduledLoadDirectory()); 
    source.setAutoCreateLocalDirectory(true); 
    ChainFileListFilter<File> chainFileFilter = new ChainFileListFilter<File>(); 
    chainFileFilter.addFilter(new LastModifiedFileListFilter()); 
    FileSystemPersistentAcceptOnceFileListFilter fs = new FileSystemPersistentAcceptOnceFileListFilter(store, "dailyfilesystem"); 
    fs.setFlushOnUpdate(true); 
    chainFileFilter.addFilter(fs); 
    source.setLocalFilter(chainFileFilter); 
    source.setCountsEnabled(true); 
    return source; 
} 

@Bean 
public PollerMetadata pollerMetadata(RetryCompoundTriggerAdvice retryCompoundTriggerAdvice) { 
    PollerMetadata pollerMetadata = new PollerMetadata(); 
    List<Advice> adviceChain = new ArrayList<Advice>(); 
    adviceChain.add(retryCompoundTriggerAdvice); 
    pollerMetadata.setAdviceChain(adviceChain); 
    pollerMetadata.setTrigger(compoundTrigger()); 
    pollerMetadata.setMaxMessagesPerPoll(1); 
    return pollerMetadata; 
} 

@Bean 
public CompoundTrigger compoundTrigger() { 
    CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger()); 
    return compoundTrigger; 
} 

@Bean 
public CronTrigger primaryTrigger() { 
    return new CronTrigger(applicationProperties.getSchedule()); 
} 

@Bean 
public PeriodicTrigger secondaryTrigger() { 
    return new PeriodicTrigger(applicationProperties.getRetryInterval()); 
} 

RetryCompoundTriggerAdviceafterReceive方法延伸AbstractMessageSourceAdvice,我拿到後第一次運行一個空的結果。

如何配置同步器使其能夠定期同步(而不僅僅是在應用程序啓動時)一次?

更新

我發現,當SFTP網站有我的應用程序啓動時在其目錄中沒有文件,則SftpInboundFileSynchronizer同步在每個輪詢間隔。所以我可以在每次調查中看到com.jcraft.jsch日誌報表。但只要在SFTP站點上找到文件,它就會同步以在本地獲取該文件,然後不再同步。

更新2

我道歉......這裏的自定義代碼:

@Component 
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice { 

    private final static Logger logger = LoggerFactory.getLogger(RetryCompoundTriggerAdvice.class); 

    private final CompoundTrigger compoundTrigger; 

    private final Trigger override; 

    private final ApplicationProperties applicationProperties; 

    private final Mail mail; 

    private int attempts = 0; 

    private boolean expectedMessage; 
    private boolean inProcess; 

    public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger, 
      @Qualifier("secondaryTrigger") Trigger override, 
      ApplicationProperties applicationProperties, 
      Mail mail) { 
     this.compoundTrigger = compoundTrigger; 
     this.override = override; 
     this.applicationProperties = applicationProperties; 
     this.mail = mail; 
    } 

    @Override 
    public boolean beforeReceive(MessageSource<?> source) { 
     logger.debug("!inProcess is " + !inProcess); 
     return !inProcess; 
    } 

    @Override 
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) { 

     if (expectedMessage) { 
      logger.info("Received expected load file. Setting cron trigger."); 
      this.compoundTrigger.setOverride(null); 
      expectedMessage = false; 
      return result; 
     } 

     final int maxOverrideAttempts = applicationProperties.getMaxFileRetry(); 

     attempts++; 
     if (result == null && attempts < maxOverrideAttempts) { 
      logger.info("Unable to find file after " + attempts + " attempt(s). Will reattempt"); 
      this.compoundTrigger.setOverride(this.override); 
     } else if (result == null && attempts >= maxOverrideAttempts) { 
      String message = "Unable to find daily file" + 
        " after " + attempts + 
        " attempt(s). Will not reattempt since max number of attempts is set at " + 
        maxOverrideAttempts + "."; 
      logger.warn(message); 
      mail.sendAdminsEmail("Missing Load File", message); 
      attempts = 0; 
      this.compoundTrigger.setOverride(null); 
     } else { 
      attempts = 0; 
      // keep periodically checking until we are certain 
      // that this message is the expected message 
      this.compoundTrigger.setOverride(this.override); 
      inProcess = true; 
      logger.info("Found load file"); 
     } 
     return result; 
    } 

    public void foundExpectedMessage(boolean found) { 
     logger.debug("Expected message was found? " + found); 
     this.expectedMessage = found; 
     inProcess = false; 
    } 

} 
+0

我們需要知道你的'RetryCompoundTriggerAdvice'邏輯。看起來像一次成功之後,你把它變成國家不要以某種方式進行投票。 –

+0

所以,您在解決方案中使用了一些自定義代碼,並且您不會與我們分享這些代碼,因此期望得到降低的投票和解決問題的近似命題。 –

+0

我的歉意...自定義代碼已被添加。 – James

回答

1

你的邏輯:

@Override 
public boolean beforeReceive(MessageSource<?> source) { 
    logger.debug("!inProcess is " + !inProcess); 
    return !inProcess; 
} 

讓我們來研究它的JavaDoc:

/** 
* Subclasses can decide whether to proceed with this poll. 
* @param source the message source. 
* @return true to proceed. 
*/ 
public abstract boolean beforeReceive(MessageSource<?> source); 

並圍繞這個方法的邏輯:

Message<?> result = null; 
    if (beforeReceive((MessageSource<?>) target)) { 
     result = (Message<?>) invocation.proceed(); 
    } 
    return afterReceive(result, (MessageSource<?>) target); 

所以,我們稱之爲invocation.proceed()(SFTP同步)只有beforeReceive()回報true。在你的情況下,只有如果!inProcess是這種情況。

afterReceive()實施中,如果您有result - 第一次嘗試,則您有inProcess = true;。看起來你只有在有人打電話給foundExpectedMessage()時纔將它重置回false

那麼,你對我們的期望是什麼,作爲你的問題的答案?它確實在您的自定義代碼中,並且與框架無關。對不起......

+1

感謝您的幫助。你是對的。事實證明,我沒有打電話給'foundExpectedMessage'。所以,它從來沒有被召喚過。對不起,我沒有看到這一點。 – James