2017-07-07 56 views
1

我試圖通過水槽從kafka hdfs中的數據。 kafka_producer每10秒發送一條消息。我會收集所有消息在hdfs上的一個文件。 這是水槽,我使用的配置,但是它存儲在HDFS許多文件(一個用於消息):只有一個文件到hdfs從kafka與水槽

agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource 
agent1.sources.kafka-source.zookeeperConnect = localhost:2181 
agent1.sources.kafka-source.topic = prova 
agent1.sources.kafka-source.groupId = flume 
agent1.sources.kafka-source.channels = memory-channel 
agent1.sources.kafka-source.interceptors = i1 
agent1.sources.kafka-source.interceptors.i1.type = timestamp 
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100 
agent1.channels.memory-channel.type = memory 
agent1.channels.memory-channel.capacity = 10000 
agent1.channels.memory-channel.transactionCapacity = 1000 
agent1.sinks.hdfs-sink.type = hdfs 
agent1.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/input 
agent1.sinks.hdfs-sink.hdfs.rollInterval = 5 
agent1.sinks.hdfs-sink.hdfs.rollSize = 0 
agent1.sinks.hdfs-sink.hdfs.rollCount = 0 
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream 
agent1.sinks.hdfs-sink.channel = memory-channel 
agent1.sources = kafka-source 
agent1.channels = memory-channel 
agent1.sinks = hdfs-sink 

P.S.我從一個file.csv開始。卡夫卡製作人接受文件並選擇一些感興趣的領域,然後每隔10秒發送一次。 Flume將條目存儲在hadoop hdfs上,但存儲在許多文件中(1條目= 1個文件)。我希望所有的條目都在一個文件中。如何改變水槽的配置?

+1

我認爲所有信息都在您的問題中,但它有點難以閱讀。你能添加一些結構嗎?例如1.我做什麼2.目前的結果是什麼3.期望的結果是什麼以及目前有哪些不同(理想情況是4.我嘗試過了什麼) –

+0

我修改了。我希望它解釋得更好。 – r3ll4

回答

0

看來flume確實已經設置爲在HDFS上爲每個輸入文件創建一個文件。

如建議here你可以通過編寫一個定期的豬(或mapreduce)作業來處理這個問題,該作業將所有的輸入文件合併起來。

減少文件數量的附加選項可能是降低入站文件的頻率。

0

將rollInterval設置爲0,因爲您不想基於時間創建不同的文件。如果您想基於數字輸入或事件來更改rollCount值。例如,如果要在一個單個文件中保存10個事件或條目:

agent1.sinks.hdfs-sink.hdfs.rollInterval = 0 
agent1.sinks.hdfs-sink.hdfs.rollSize = 0 
agent1.sinks.hdfs-sink.hdfs.rollCount = 10