2015-08-15 129 views
1

我想將mysql general_log添加到logstash中。我已經設法使CSV格式的mysql日誌和CSV模式,應該沒有更容易做的事情。這是我general_log項:logstash mysql general_logs CSV格式

"2015-08-15 11:52:57","mrr[mrr] @ localhost []",4703,0,"Query","SET NAMES utf8" 
"2015-08-15 11:52:57","mrr[mrr] @ localhost []",4703,0,"Query","SELECT @@SESSION.sql_mode" 
"2015-08-15 11:52:57","mrr[mrr] @ localhost []",4703,0,"Query","SET SESSION sql_mode='NO_ENGINE_SUBSTITUTION'" 
"2015-08-15 11:52:57","mrr[mrr] @ localhost []",4703,0,"Init DB","mrr" 

這裏是我logstash.conf:

input { 
     lumberjack { 
       port => 5000 
       type => "logs" 
       ssl_certificate => "/etc/pki/tls/certs/logstash_forwarder.crt" 
       ssl_key => "/etc/pki/tls/private/logstash_forwarder.key" 
     } 
} 
filter { 
     if [type] == "nginx-access" { 
       grok { 
         match => { 'message' => '%{IPORHOST:clientip} %{NGUSER:indent} %{NGUSER:agent} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request}(?: HTTP/%{NUMBER:httpversion})?|)\" %{NUMBER:answer} (?:%{NUMBER:byte}|-) (?:\"(?:%{URI:referrer}|-))\" (?:%{QS:referree}) %{QS:agent}' } 
       } 
       geoip { 
         source => "clientip" 
         target => "geoip" 
         database => "/etc/logstash/GeoLiteCity.dat" 
         add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] 
         add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] 
       } 
       mutate { 
         convert => [ "[geoip][coordinates]", "float" ] 
       } 
     } 
     if [type] == "mysql-general" { 
       csv { 
         columns => [ "@timestamp(6)", "user_host", "thready_id", "server_id", "ctype", "query" ] 
         separator => "," 
       } 
       grok { 
       match => { "user_host", "%{WORD:remoteuser}\[%{WORD:localuser}\] \@ %{IPORHOST:dbhost} \[(?:%{IPORHOST:qhost}|-)\]" } 
       } 
     } 
} 
output { 
     stdout { 
       codec => rubydebug 
     } 
     elasticsearch { 
       host => "172.17.0.5" 
       cluster => "z0z0.tk-1.5" 
       flush_size => 2000 
     } 
} 

但是USER_HOST列具有以下格式: "mrr[mrr] @ localhost []",我想將它拆分成至少兩個不同的爲用戶指定一個值,爲主機指定另一個值。

我已經運行logstash此配置,並在_grokparsefailure由於神交最終解析

當我運行的配置文件中checktest選項我得到了以下的輸出:

Error: Expected one of #, => at line 36, column 26 (byte 1058) after filter { 
    if [type] == "nginx-access" { 
     grok { 
      match => { 'message' => '%{IPORHOST:clientip} %{NGUSER:indent} %{NGUSER:agent} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request}(?: HTTP/%{NUMBER:httpversion})?|)\" %{NUMBER:answer} (?:%{NUMBER:byte}|-) (?:\"(?:%{URI:referrer}|-))\" (?:%{QS:referree}) %{QS:agent}' } 
     } 
     geoip { 
      source => "clientip" 
      target => "geoip" 
      database => "/etc/logstash/GeoLiteCity.dat" 
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] 
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] 
     } 
     mutate { 
      convert => [ "[geoip][coordinates]", "float" ] 
     } 
    } 
    if [type] == "mysql-general" { 
     csv { 
      columns => [ "@timestamp(6)", "user_host", "thready_id", "server_id", "ctype", "query" ] 
      separator => "," 
     } 
     grok { 
     match => { "user_host" 

你能給我一個想法什麼是錯的?

+0

您的輸入設置類型爲「日誌」和你的條件正在檢測「的mysql-一般」。您可能想要解決這個問題,或發佈真正的配置。 –

+0

我已更正配置文件。現在這樣做了嗎? – zozo6015

回答

0

得到它的工作。這個錯誤實際上是在grok模式中,因爲第一個用戶和最後一個主機在某些時候是格雷克解析失敗的,所以我不得不添加一些括號來接受空字符串。目前logstash.conf看起來是這樣的:

input { 
     lumberjack { 
       port => 5000 
       type => "logs" 
       ssl_certificate => "/etc/pki/tls/certs/logstash_forwarder.crt" 
       ssl_key => "/etc/pki/tls/private/logstash_forwarder.key" 
     } 
} 
filter { 
     if [type] == "nginx-access" { 
       grok { 
         match => { 'message' => '%{IPORHOST:clientip} %{NGUSER:indent} %{NGUSER:agent} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{URIPATHPARAM:request}(?: HTTP/%{NUMBER:httpversion})?|)\" %{NUMBER:answer} (?:%{NUMBER:byte}|-) (?:\"(?:%{URI:referrer}|-))\" (?:%{QS:referree}) %{QS:agent}' } 
       } 
       geoip { 
         source => "clientip" 
         target => "geoip" 
         database => "/etc/logstash/GeoLiteCity.dat" 
         add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] 
         add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] 
       } 
       mutate { 
         convert => [ "[geoip][coordinates]", "float" ] 
       } 
     } 
     if [type] == "mysql-general" { 
       csv { 
         columns => [ "@timestamp(6)", "user_host", "thready_id", "server_id", "ctype", "query" ] 
         separator => "," 
       } 
       grok { 
       match => { "user_host", "(?:%{WORD:remoteuser}|)\[%{WORD:localuser}\] \@ %{IPORHOST:dbhost} \[(?:%{IPORHOST:qhost}|)\]" } 
       } 
     } 
} 
output { 
     stdout { 
       codec => rubydebug 
     } 
     elasticsearch { 
       host => "172.17.0.5" 
       cluster => "clustername" 
       flush_size => 2000 
     } 
} 

感謝您的幫助和建議

1

csv {}過濾器只是解析,um,逗號分隔的值。如果您想分析其他格式的字段,請在csv {}過濾器創建後在user_host列上使用grok {}。

編輯:更明確。

運行CSV過濾器:

csv { 
    columns => [ "@timestamp(6)", "user_host", "thready_id". "server_id", "ctype", "query" ] 
    separator => "," 
} 

應該創建您一個名爲 「USER_HOST」 字段。

然後,您可以通過神交過濾器運行這個領域,像這樣(未經)之一:

grok { 
    match => [ "user_host", "%{WORD:myUser}\[%{WORD}\] @ %{WORD:myHost} \[\]" ] 
} 

這會爲你創建兩個字段:myUsermyHost

+0

我已經配置MySQL服務器來登錄cab格式,這就是爲什麼我有輸入類型作爲日誌。並且由於logstash-forwarder正在發送csv中的日誌來記錄存儲,我認爲將過濾器設置爲csv解析器以創建不同的字段是正確的。然而,字段user_host有兩個值,我想將它們分別放在兩個單獨的字段中,我不知道該怎麼做。也許mutate split會是一個不錯的選擇。 – zozo6015

+0

除非'type'是'mysql-general',否則不會使用csv過濾器。至於進一步處理字段,我的回答建議使用grok {}。 –

+0

我有完全相同的安裝程序只用於nginx而不是使用csv我是用Grok解析日誌,它的工作原理很好 – zozo6015