2014-10-30 80 views
1

我有以下json輸入,我想轉儲到logstash(最終在elasticsearch/kibana中搜索/儀表板)。輸入json到logstash - config的問題?

{"vulnerabilities":[ 
    {"ip":"10.1.1.1","dns":"z.acme.com","vid":"12345"}, 
    {"ip":"10.1.1.2","dns":"y.acme.com","vid":"12345"}, 
    {"ip":"10.1.1.3","dns":"x.acme.com","vid":"12345"} 
]} 

我使用

input { 
    file { 
    path => "/tmp/logdump/*" 
    type => "assets" 
    codec => "json" 
    } 
} 
output { 
    stdout { codec => rubydebug } 
    elasticsearch { host => localhost } 
} 

輸出

{ 
     "message" => "{\"vulnerabilities\":[\r", 
     "@version" => "1", 
    "@timestamp" => "2014-10-30T23:41:19.788Z", 
      "type" => "assets", 
      "host" => "av12612sn00-pn9", 
      "path" => "/tmp/logdump/stack3.json" 
} 
{ 
     "message" => "{\"ip\":\"10.1.1.30\",\"dns\":\"z.acme.com\",\"vid\":\"12345\"},\r", 
     "@version" => "1", 
    "@timestamp" => "2014-10-30T23:41:19.838Z", 
      "type" => "assets", 
      "host" => "av12612sn00-pn9", 
      "path" => "/tmp/logdump/stack3.json" 
} 
{ 
     "message" => "{\"ip\":\"10.1.1.31\",\"dns\":\"y.acme.com\",\"vid\":\"12345\"},\r", 
     "@version" => "1", 
    "@timestamp" => "2014-10-30T23:41:19.870Z", 
      "type" => "shellshock", 
      "host" => "av1261wag2sn00-pn9", 
      "path" => "/tmp/logdump/stack3.json" 
} 
{ 
      "ip" => "10.1.1.32", 
      "dns" => "x.acme.com", 
      "vid" => "12345", 
     "@version" => "1", 
    "@timestamp" => "2014-10-30T23:41:19.884Z", 
      "type" => "assets", 
      "host" => "av12612sn00-pn9", 
      "path" => "/tmp/logdump/stack3.json" 
} 

明顯logstash正在處理的每一行作爲一個事件和其認爲{"vulnerabilities":[以下logstash配置是一個事件,我猜測2個後續節點上的尾隨逗號會搞亂解析,並且最後一個節點看起來是正確的。我如何告訴logstash解析漏洞數組內的事件並忽略行尾的逗號?

更新:2014-11-05 根據Magnus的建議,我添加了json過濾器,它的工作完美。但是,如果沒有在文件輸入塊中指定start_position => "beginning",它不會正確解析json的最後一行。任何想法爲什麼不呢?我知道它會默認解析自下而上,但是會預期mutate/gsub能夠順利處理這個問題嗎?

file { 
    path => "/tmp/logdump/*" 
    type => "assets" 
    start_position => "beginning" 
    } 
} 
filter { 
    if [message] =~ /^\[?{"ip":/ { 
    mutate { 
     gsub => [ 
     "message", "^\[{", "{", 
     "message", "},?\]?$", "}" 
     ] 
    } 
    json { 
     source => "message" 
     remove_field => ["message"] 
    } 
    } 
} 
output { 
    stdout { codec => rubydebug } 
    elasticsearch { host => localhost } 
} 

回答

5

你可以跳過JSON編解碼器和使用多過濾器加入該消息成一個單一的字符串,可以喂到JSON filter.filter {

filter { 
    multiline { 
    pattern => '^{"vulnerabilities":\[' 
    negate => true 
    what => "previous" 
    } 
    json { 
    source => "message" 
    } 
} 

然而,這會產生以下不想要的結果:

{ 
      "message" => "<omitted for brevity>", 
      "@version" => "1", 
     "@timestamp" => "2014-10-31T06:48:15.589Z", 
       "host" => "name-of-your-host", 
       "tags" => [ 
     [0] "multiline" 
    ], 
    "vulnerabilities" => [ 
     [0] { 
      "ip" => "10.1.1.1", 
      "dns" => "z.acme.com", 
      "vid" => "12345" 
     }, 
     [1] { 
      "ip" => "10.1.1.2", 
      "dns" => "y.acme.com", 
      "vid" => "12345" 
     }, 
     [2] { 
      "ip" => "10.1.1.3", 
      "dns" => "x.acme.com", 
      "vid" => "12345" 
     } 
    ] 
} 

除非有漏洞的陣列,我不認爲還有很多我們可以做這個元素中的一個固定數(不訴諸紅寶石濾波)。

如何將json過濾器應用於看起來像我們想要的行並放棄其餘行?你的問題沒有說清楚是否所有的日誌都是這樣的,所以這可能不是那麼有用。

filter { 
    if [message] =~ /^\s+{"ip":/ { 
    # Remove trailing commas 
    mutate { 
     gsub => ["message", ",$", ""] 
    } 
    json { 
     source => "message" 
     remove_field => ["message"] 
    } 
    } else { 
    drop {} 
    } 
}