2017-07-31 97 views
0

我的要求如下。使用Apache Camel同步FTP遠程和FS(或HDFS)

有時,我想手動運行一個工具,將遠程FTP與我的FS(至少,HDFS)遠程同步。 應該下載每個文件,應該附加更新的文件。

我使用Apache Camel FTP2示例: https://github.com/apache/camel/tree/master/examples/camel-example-ftp 啓動該項目。在其他來源的幫助下,這是一個簡單的解決方案。

object Main{ 

    def main(args: Array[String]): Unit = { 
    val context = new DefaultCamelContext 
    context.addRoutes(FtpRoute()) 

    context.start 
    Thread.sleep(100000) 
    context.stop 
    }  
} 

case class FtpRoute() extends RouteBuilder { 

    def configure(): Unit = { // configure properties component 
    val pc = getContext.getComponent("properties", classOf[PropertiesComponent]) 
    pc.setLocation("classpath:ftp.properties") 

    val ftpSource = getContext.resolvePropertyPlaceholders(
     s"ftp://{{ftp.serv}}{{ftp.path}}?username={{ftp.user}}&password={{ftp.pass}}&passiveMode=true") 

    from(ftpSource) 
     .to("file:/tmp/ftp") 
     .log("Downloaded file ${file:name} complete.") 
    } 
} 

我可以說它的工作原理,但是......絕對不是我想要的。

  1. 如何跟蹤文件進度,以及(可能)在所有文件下載時停止Camel上下文?我需要單獨處理嗎?
  2. 來自FTP的文件被一遍又一遍地下載。馬不停蹄。爲什麼他一直在重新下載文件?
  3. 很多FTP連接正在創建中。 disconnect=true在這裏沒有幫助。你有什麼經驗嗎?

非常感謝!

回答

0

ftp組件是投票的消費者。

這樣的組件輪詢一個URI每延遲毫秒。

每個輪詢組件都會從頭到尾地獲取所有新文件。 這就是爲什麼你看到一個文件不斷redownloading。下載完所有文件後,組件將停止並等待,直到計時器或時間表發佈新事件。

  1. 您必須使用Idemponent消費者EIP(http://camel.apache.org/idempotent-consumer.html)。 我建議您使用JDBC idemponent使用者。 SQLite是我的最愛。

我的FTP選項:

disconnect=true& 
connectTimeout=300000& 
ftpClient.dataTimeout=300000& 
timeout=300000&soTimeout=300000& 
delay=3600000&reconnectDelay=10000& 
backoffErrorThreshold=1& 
maximumReconnectAttempts=3& 
backoffMultiplier=2& 
passiveMode=true& 
noop=true& 
idempotent=true& 
idempotentRepository=#jdbcDocFileIdempotentRepository& 
download=true& 
recursive=true& 
stepwise=true& 
antInclude=**/currMonth/*.zip&antExclude=**/prevMonth/*.zip& 
idempotentKey=${file:onlyname}-${file:size} 

和冪等的消費:

<bean id="jdbcDocFileIdempotentRepository" 
     class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository"> 
    <argument ref="embeddedDataSource"/> 
    <argument value="DocFileRepo"/> 
    <property name="tableExistsString" value="SELECT 1 FROM MESSAGE_REPOSITORY WHERE 1 = 0"/> 
    <property name="createString" value="CREATE TABLE MESSAGE_REPOSITORY (processorName TEXT, messageId TEXT, createdAt TIMESTAMP, result TEXT)"/> 
    <property name="queryString" value="SELECT COUNT(*) FROM MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?"/> 
    <property name="insertString" value="INSERT INTO MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?, ?, ?)"/> 
    <property name="deleteString" value="DELETE FROM MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?"/> 
</bean> 

數據源:

<bean id="embeddedDataSource" class="org.sqlite.javax.SQLiteConnectionPoolDataSource"> 
    <property name="url" value="jdbc:sqlite:${embedded_db_config_path}" /> 
    <property name="encoding" value="UTF-8"/> 
    <property name="journalMode" value="WAL"/> 
</bean> 
  • 您需要設置一個合適的延遲,它需要比de大得多故障。
  • 如果延遲足夠大,則連接將通過超時關閉。當然你可以調整超時時間。
  • 修訂

    據駱駝文檔:

    選項: =使用冪等消費EIP模式讓駱駝跳過已經處理的文件真實

    選項。默認情況下會使用一個基於內存的LRUCache,其中包含1000個條目。如果noop = true,那麼idempotent也會被啓用,以避免重複使用相同的文件。

    選項:idempotentRepository

    在默認情況下使用MemoryMessageIdRepository如果沒有指定的和冪是真實的可插拔儲存庫org.apache.camel.spi.IdempotentRepository。

    所以,在默認情況下,FTP使用的內存解決方案

    +0

    所以我需要一個數據庫來告訴駱駝被下載哪些文件?有沒有內存中的解決方案?但不是h2分貝。 – Atais

    +0

    @Atais MemoryIdempotentRepository是默認解決方案,但是如果您重新啓動駱駝上下文,則會丟失所有以前下載的文件密鑰。默認情況下,它包含最後的1000個鍵。您也可以使用FileIdempotentRepository和其他許多其他解決方案(您可以在這裏找到完整的解決方案列表:http://camel.apache.org/idempotent-consumer.html)。 –

    +0

    @Atais我擴展了我的答案。 –