0

這是輸入文件:如何通過LogStash過濾簡單消息ElasticSearch除以在該消息中的多個字段

{"meta":"","level":"error","message":"clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.","timestamp":"2017-04-06T16:08:37.861Z"} 
{"meta":"","level":"error","message":"clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.","timestamp":"2017-04-06T19:40:17.682Z"} 

基本上,這樣的日誌是經由Winstom模塊我的申請的NodeJS的結果。我的疑問重點在於如何調整logstash過濾器以獲得在ElasticSearch中創建的4個字段。我的目的是看到「列」(屬性或文件可能是ElasticSearch上下文中我猜的更好的詞):level(例如錯誤),message_source(例如clientErrorHandler),message_content(例如Erronão... serviços)和error_time(無需納秒)(例如,2017-04-06T19:40:17)。

我被困在這一點上:

1 - 我用這個logstash.conf

input { 
    file { 
     path => "/home/demetrio/dev/testes_manuais/ELK/logs/*" 
     start_position => "beginning" 

    } 
} 

filter { 

    grok { 
     match => { 
     "message" => '%{SYSLOG5424SD:loglevel} %{TIMESTAMP_ISO8601:Date} %{GREEDYDATA:content}' 
     } 
    } 

    date { 
    match => [ "Date", "YYYY-mm-dd HH:mm:ss.SSS" ] 
    locale => en 
    } 

} 

output { 
    stdout { 
    codec => plain { 
         charset => "ISO-8859-1" 
       } 

    } 
    elasticsearch { 
     hosts => "http://127.0.0.1:9200" 
     index => "dmz-logs-indice" 

    } 
} 

2 - 通過Kibana DevTools搜索ElasticSearch

GET _search 
{ 
    "query": { 
    "match_all": {} 
    } 
} 

,我看到:

{ 
    "took": 5, 
    "timed_out": false, 
    "_shards": { 
    "total": 6, 
    "successful": 6, 
    "failed": 0 
    }, 
    "hits": { 
    "total": 3, 
    "max_score": 1, 
    "hits": [ 
     { 
     "_index": ".kibana", 
     "_type": "config", 
     "_id": "5.3.0", 
     "_score": 1, 
     "_source": { 
      "buildNum": 14823 
     } 
     }, 
     { 
     "_index": "dmz-logs-indice", 
     "_type": "logs", 
     "_id": "AVtJLZ5x6gscWn5fxxA_", 
     "_score": 1, 
     "_source": { 
      "path": "/home/demetrio/dev/testes_manuais/ELK/logs/logs.log", 
      "@timestamp": "2017-04-07T16:09:36.996Z", 
      "@version": "1", 
      "host": "nodejs", 
      "message": """{"meta":"","level":"error","message":"clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.","timestamp":"2017-04-06T16:08:37.861Z"}""", 
      "tags": [ 
      "_grokparsefailure" 
      ] 
     } 
     }, 
     { 
     "_index": "dmz-logs-indice", 
     "_type": "logs", 
     "_id": "AVtJLZ5x6gscWn5fxxBA", 
     "_score": 1, 
     "_source": { 
      "path": "/home/demetrio/dev/testes_manuais/ELK/logs/logs.log", 
      "@timestamp": "2017-04-07T16:09:36.998Z", 
      "@version": "1", 
      "host": "nodejs", 
      "message": """{"meta":"","level":"error","message":"clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.","timestamp":"2017-04-06T19:40:17.682Z"}""", 
      "tags": [ 
      "_grokparsefailure" 
      ] 
     } 
     } 
    ] 
    } 
} 

I想我應該用一些RegularExpresss或神交以四撕成小塊分:

1 - 級 2 - 消息什麼來之前「:」 3 - 用什麼來後,消息「:」 4 - 時間戳

而且,如果可能的話,提供更好的列(字段/屬性)標籤等:

1 - 水平 2 - message_source 3 - MESSAGE_CONTENT 4 - error_time

最後刪除時間戳納秒

PS。萬一將來的某個讀者獲得感興趣的我如何在登錄的NodeJS,你在這兒:

...

var winston = require('winston'); 
winston.emitErrs = true; 

var logger = new winston.Logger({ 
    transports: [ 
     new winston.transports.File({ 
      level: 'error', 
      filename: './logs/logs.log', 
      handleExceptions: true, 
      json: true, 
      maxsize: 5242880, //5MB 
      maxFiles: 5, 
      colorize: false, 
      prettyPrint: true 
     })    
    ], 
    exitOnError: false 
}); 

... 

function clientErrorHandler(err, req, res, next) { 
     logger.log("error","clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.",err.message); 

     res.send(500, { error: 'Erro genérico!' }); 

    } 

app.use(clientErrorHandler); 

PS2:我仔細看了喜歡Filter specific Message with logstash before sending to ElasticSearch的問題,但我真的堅持

回答

1

由於您的應用程序將日誌輸出爲JSON字符串,因此可以將Logstash配置爲將日誌解析爲JSON。這與將codec => "json"添加到文件輸入配置中一樣簡單。

以下是爲您的方案的示例配置:

input { 
    file { 
    path => "/home/demetrio/dev/testes_manuais/ELK/logs/*" 
    start_position => "beginning" 
    codec => "json" 
    } 
} 

filter { 
    # This matches `timestamp` field into `@timestamp` field for Kibana to consume. 
    date { 
    match => [ "timestamp", "ISO8601" ] 
    remove_field => [ "timestamp" ] 
    } 
} 

output { 
    stdout { 
    # This codec gives your more details about the event. 
    codec => rubydebug 
    } 

    elasticsearch { 
    hosts => "http://127.0.0.1:9200" 
    index => "dmz-logs-indice" 
    } 
} 

這是樣品stdout從Logstash:

{ 
      "path" => "/home/demetrio/dev/testes_manuais/ELK/logs/demo.log", 
    "@timestamp" => 2017-04-06T19:40:17.682Z, 
     "level" => "error", 
      "meta" => "", 
     "@version" => "1", 
      "host" => "dbf718c4b8e4", 
     "message" => "clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.", 
} 
+0

感謝。我只是想念如何完成我的問題的這一部分「我的意圖是看到」列「(屬性或文件可能是ElasticSearch上下文中我猜的更好的詞):level(例如錯誤),message_source(例如。clientErrorHandler),message_content(例如Erronão...serviços)和error_time(毫秒)(例如。2017-04-06T19:40:17)「。我的最終目的是分離這些消息文本。整個消息在一個字段中。 – DemeCarvO