2017-09-13 72 views
1

我有一個包含在每一行JSON的幾個文件HDFS匯:如何擺脫由水槽在HDFS中的每一個事件添加時間戳的文件

[[email protected] vp_flume]# more vp_170801.txt.finished | awk '{printf("%s\n", substr($0,0,20))}' 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 
{"status":"OK","resp 

我的水槽的配置是

[[email protected] flume]# cat flume_test.conf 
agent.sources = seqGenSrc 
agent.channels = memoryChannel 
agent.sinks = loggerSink 

agent.sources.seqGenSrc.type = spooldir 
agent.sources.seqGenSrc.spoolDir = /moveitdata/dong/vp_flume 
agent.sources.seqGenSrc.deserializer.maxLineLength = 10000000 
agent.sources.seqGenSrc.fileSuffix = .finished 
agent.sources.seqGenSrc.deletePolicy = never 

agent.sources.seqGenSrc.channels = memoryChannel 
agent.sinks.loggerSink.channel = memoryChannel 
agent.channels.memoryChannel.type = memory 
agent.channels.memoryChannel.capacity = 100 

agent.sinks.loggerSink.type = hdfs 
agent.sinks.loggerSink.hdfs.path = /home/dong/vp_flume 

agent.sinks.loggerSink.hdfs.writeFormat = Text 
agent.sinks.loggerSink.hdfs.rollInterval = 0 
agent.sinks.loggerSink.hdfs.rollSize = 1000000000 
agent.sinks.loggerSink.hdfs.rollCount = 0 

在HDFS中的文件是:

[[email protected] flume]# hadoop fs -text /home/dong/vp_flume/* | awk '{printf("%s\n", substr($0,0,20))}' | more 
1505276698665 {"stat 
1505276698665 {"stat 
1505276698666 {"stat 
1505276698666 {"stat 
1505276698666 {"stat 
1505276698667 {"stat 
1505276698667 {"stat 
1505276698667 {"stat 
1505276698668 {"stat 
1505276698668 {"stat 
1505276698668 {"stat 
1505276698668 {"stat 
1505276698669 {"stat 
1505276698669 {"stat 
1505276698669 {"stat 
1505276698669 {"stat 
1505276698670 {"stat 
1505276698670 {"stat 
1505276698670 {"stat 
1505276698670 {"stat 

問:我不喜歡它是通過水槽中的每一事件添加時間戳。但是,如何通過正確配置flume來擺脫它?

回答

1

您尚未在您的代理配置文件中明確提及hdfs.fileType屬性,因此Flume將默認使用SequenceFile。 SequenceFile支持兩種寫入格式:TextWritable。您已設置hdfs.writeFormat = Text這意味着Flume將使用HDFSTextSerializer序列化您的事件。如果你看看its source(53行),你會看到它添加了一個時間戳作爲默認鍵。

使用hdfs.writeFormat = Writable也無濟於事,因爲它的確如此。您可以檢查其來源here(第52行)。

SequenceFile總是需要一個密鑰。所以,除非你有充分的理由使用SequenceFile,否則我建議你在你的代理配置中使用hdfs.fileType = DataStream