2015-09-07 164 views
0

我正在使用flume將日誌行處理爲hdfs,並使用ElasticSearchSink將它們記錄到ElasticSearch中。Flume ElasticSearchSink不會消耗所有消息

這裏是我的配置:

agent.channels.memory-channel.type = memory 

agent.sources.tail-source.type = exec 
agent.sources.tail-source.command = tail -4000 /home/cto/hs_err_pid11679.log 
agent.sources.tail-source.channels = memory-channel 

agent.sinks.log-sink.channel = memory-channel 
agent.sinks.log-sink.type = logger 

#####INTERCEPTORS 

agent.sources.tail-source.interceptors = timestampInterceptor 
agent.sources.tail-source.interceptors.timestampInterceptor.type = org.apache.flume.interceptor.TimestampInterceptor$Builder 

####SINK 
# Setting the sink to HDFS 
agent.sinks.hdfs-sink.channel = memory-channel 
agent.sinks.hdfs-sink.type = hdfs 
agent.sinks.hdfs-sink.hdfs.path = hdfs://localhost:8020/data/flume/%y-%m-%d/ 
agent.sinks.hdfs-sink.hdfs.fileType = DataStream 
agent.sinks.hdfs-sink.hdfs.inUsePrefix =. 
agent.sinks.hdfs-sink.hdfs.rollCount = 0 
agent.sinks.hdfs-sink.hdfs.rollInterval = 0 
agent.sinks.hdfs-sink.hdfs.rollSize = 10000000 
agent.sinks.hdfs-sink.hdfs.idleTimeout = 10 
agent.sinks.hdfs-sink.hdfs.writeFormat = Text 

agent.sinks.elastic-sink.channel = memory-channel 
agent.sinks.elastic-sink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink 
agent.sinks.elastic-sink.hostNames = 127.0.0.1:9300 
agent.sinks.elastic-sink.indexName = flume_index 
agent.sinks.elastic-sink.indexType = logs_type 
agent.sinks.elastic-sink.clusterName = elasticsearch 
agent.sinks.elastic-sink.batchSize = 500 
agent.sinks.elastic-sink.ttl = 5d 
agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer 


# Finally, activate. 
agent.channels = memory-channel 
agent.sources = tail-source 
agent.sinks = log-sink hdfs-sink elastic-sink 

的問題是,我只看到在HDFS文件中使用彈性1-2 kibana消息和大量消息。

任何想法我在這裏失蹤?

+0

不知道是什麼問題,但我想知道這是否可能是由於使用一個通道發送事件到多個接收器? [見這裏](https://mail-archives.apache.org/mod_mbox/flume-user/201306.mbox/%[email protected]%3E)也瞭解到' agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer'應該被刪除,否則時間戳字段被創建爲錯誤的類型。 –

回答

0

該問題與串行器中的錯誤有關。 如果我們放棄該行:

agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer 

消息消耗沒有問題。 問題在於使用序列化程序時創建@timestamp字段的方式。