2015-06-22 73 views
4

我一直試圖使用logstash解析我的python回溯日誌。我的日誌是這樣的:使用logstash解析包含python回溯的日誌

[pid: 26422|app: 0|req: 73/73] 192.168.1.1() {34 vars in 592 bytes} [Wed Feb 18 13:35:55 2015] GET /data => generated 2538923 bytes in 4078 msecs (HTTP/1.1 200) 2 headers in 85 bytes (1 switches on core 0) 
Traceback (most recent call last): 
    File "/var/www/analytics/parser.py", line 257, in parselogfile 
    parselogline(basedir, lne) 
    File "/var/www/analytics/parser.py", line 157, in parselogline 
    pval = understandpost(parts[3]) 
    File "/var/www/analytics/parser.py", line 98, in understandpost 
    val = json.loads(dct["events"]) 
    File "/usr/lib/python2.7/json/__init__.py", line 338, in loads 
    return _default_decoder.decode(s) 
    File "/usr/lib/python2.7/json/decoder.py", line 366, in decode 
    obj, end = self.raw_decode(s, idx=_w(s, 0).end()) 
    File "/usr/lib/python2.7/json/decoder.py", line 382, in raw_decode 
    obj, end = self.scan_once(s, idx) 
ValueError: Unterminated string starting at: line 1 column 355 (char 354) 

到目前爲止,我已經能夠解析日誌除了最後一行即

ValueError: Unterminated string starting at: line 1 column 355 (char 354) 

我使用的是多過濾器這樣做。我的logstash配置看起來像這樣:

filter { 

    multiline { 
     pattern => "^Traceback" 
     what => "previous" 
    } 

    multiline { 
     pattern => "^ " 
     what => "previous" 
    } 


    grok { 
     match => [ 
      "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}" 
      ] 
    } 

    if "_grokparsefailure" in [tags] { 
     multiline { 
      pattern => "^.*$" 
      what => "previous" 
      negate => "true" 
     } 
    } 

    if "_grokparsefailure" in [tags] { 
     grok { 
      match => [ 
        "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}" 
     ] 
      remove_tag => ["_grokparsefailure"] 
     } 
    } 
} 

但是我的最後一行不是解析。相反,它仍然給我一個錯誤,並且也使處理時間呈指數增長。有關如何解析追溯最後一行的任何建議?

+0

我以前沒見過配置有三個多行配置。通常,您會找到標識某個部分開頭的模式(對於您可能是「[pid」或「Traceback」),然後將所有內容組合到一個消息中。 –

+0

做了一些家庭作業,並使用一個多行過濾器解決它,我只需確定第一行以'['爲我的日誌標識開頭,其他行將使用多行過濾器附加。我會在這裏發佈解決方案,以防其他人需要解析python日誌。 –

+0

另外三個多線過濾器使用了太多的處理能力,並且減慢了logstash,而使用一個這樣的過濾器,它就像一個魅力! –

回答

4

嗯,我找到了一個解決方案。所以我遵循的方法是,我將忽略以'['開頭的日誌消息的開始,並且所有其他行將被添加到前一個消息的末尾。然後grok過濾器可以被應用並且回溯可以被分析。請注意,我必須應用兩個grok過濾器:

  1. 用於何時有GREEDYDATA追溯以獲取追溯。

  2. 對於沒有回溯的情況,GREEDYDATA解析失敗,我必須移除_grokparsefailure標記,然後再次申請groree而不使用GREEDYDATA。這是在if塊的幫助下完成的。

最後logstash過濾器看起來是這樣的:

filter { 

    multiline { 
     pattern => "^[^\[]" 
     what => "previous" 
    } 



    grok { 
     match => [ 
      "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}" 
     ] 
    } 

    if "_grokparsefailure" in [tags] { 
     grok { 
      match => [ 
      "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)" 
       ] 
      remove_tag => ["_grokparsefailure"] 
     } 
    } 

    else { 
     mutate { 
      convert => {"traceback" => "string"} 
     } 
    } 

    date { 
     match => ["timestamp", "dd/MM/YYYY:HH:MM:ss Z"] 
     locale => en 
    } 
    geoip { 
     source => "clientip" 
    } 
    useragent { 
     source => "agent" 
     target => "Useragent" 
    } 
} 

另外,如果你不想使用if塊檢查另一個神交模式和刪除_grokparsefailure,您可以使用第一個grok過濾器通過在match Grok過濾器數組中包含多個消息模式檢查來檢查兩種消息類型。它可以這樣做:

 grok { 
      match => [ 
      "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)", 
      "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)%{GREEDYDATA:traceback}" 
       ] 
     } 

而且還有第三種方法(可能是最優雅的一種)。它看起來像這樣:

grok { 
    match => [ 
     "message", "\[pid\: %{NUMBER:process_id:int}\|app: 0\|req: %{NUMBER}/%{NUMBER}\] %{IPORHOST:clientip} \(\) \{%{NUMBER:vars:int} vars in %{NUMBER:bytes:int} bytes\} \[%{GREEDYDATA:timestamp}\] %{WORD:method} /%{GREEDYDATA:referrer} \=\> generated %{NUMBER:generated_bytes:int} bytes in %{NUMBER} msecs \(HTTP/%{NUMBER} %{NUMBER:status_code:int}\) %{NUMBER:headers:int} headers in %{NUMBER:header_bytes:int} bytes \(%{NUMBER:switches:int} switches on core %{NUMBER:core:int}\)(%{GREEDYDATA:traceback})?" 
    ] 
} 

請注意,在此方法中,存在是可選的字段必須包含在「()?」中。這裏,(%{GREEDYDATA:traceback})?

因此,grok過濾器看到,如果該字段可用,它將被解析。否則,它將被跳過。

+0

多線看起來更好!請注意,您可以使GREEDYDATA部分在您的模式中可選。 (富)?是我認爲的一般語法。 –

+0

讓我來搜索一下,一個更簡單和更小的方法總是合理的! ;) –

+0

找到了。它不使用foo,而是我們可以在第一個grok過濾器中定義多個消息匹配模式,因此不需要if塊。編輯我的答案以引用該方法。 –