2017-04-17 75 views
0

其實我有2個問題,我的第一個問題是:在整個文件被flume agent刷新後如何使HDFS關閉文件(例如.123456789.tmp)。事實上,這個文件從未關閉,直到我強迫水槽代理停止。 我beleive存在使用4個參數如下方法:如何使水槽加載文件爲hdfs,hdfs從不關閉文件.tmp並按名稱重命名文件。

hdfs.rollSize = 0 
hdfs.rollCount =0 
hdfs.rollInterval = 0 
hdfs.batchsize = 1000000 

好了,我的第二個問題是,我的經紀人水槽從SFTP服務器接收文件,而我需要保持每個文件名中的HDFS。它與spooldir類型正常工作,但不與SFTP!有什麼想法嗎?

的水槽劑如按照我的配置文件:

agent.sources = r1 
agent.channels = c1 
agent.sinks = k 

configure ftp source 

agent.sources.r1.type = org.keedio.flume.source.mra.source.Source 
agent.sources.r1.client.source = sftp 
agent.sources.r1.name.server = ip 
agent.sources.r1.user = user 
agent.sources.r1.password = secret 
agent.sources.r1.port = 22 
agent.sources.r1.knownHosts = ~/.ssh/known_hosts 
agent.sources.r1.work.dir = /DATA/test/flumrFTP 
agent.sources.r1.fileHeader = true 
agent.sources.r1.basenameHeader = true 
agent.sources.r1.inputCharset = ISO-8859-1 
#agent.sources.r1.batchSize = 1000 
agent.sources.r1.flushlines = true 

configure sink s1 
agent.sinks.k.type = hdfs 
agent.sinks.k.hdfs.path = hdfs://hostname:8000/user/admin/DATA/import_flume/ 
agent.sinks.k.hdfs.filePrefix = %{basename} 
agent.sinks.k.hdfs.rollCount = 0 
agent.sinks.k.hdfs.rollInterval = 0 
agent.sinks.k.hdfs.rollSize = 0 
agent.sinks.k.hdfs.useLocalTimeStamp = true 
agent.sinks.k.hdfs.batchsize = 1000000 
agent.sinks.k.hdfs.fileType = DataStream 

Use a channel which buffers events in memory 
agent.channels.c1.type = memory 
agent.channels.c1.capacity = 1000000 
agent.channels.c1.transactionCapacity = 1000000 

agent.sources.r1.channels = c1 
agent.sinks.k.channel = c1 

回答

2

嘗試設置變量

hdfs.rollInterval它的秒軋製當前文件

此設置關閉前等待的秒數文件在您設置的秒數之後。我在200秒鐘設置了我的地址,我正在加載較小的文件