2015-04-07 108 views
1

我有一個包含文件等系列消息:logstash計算經過時間不工作

component+branch.job         2014-09-04_21:24:46 2014-09-04_21:24:49 

這是字符串,一些白色的空間,第一日期和時間,一些白色的空間和第二的日期和時間。目前我使用這種過濾:

grok { 
    match => [ "message", "%{WORD:componentName}\+%{WORD:branchName}\.%{DATA:jobType}\s+20%{DATE:dateStart}_%{TIME:timeStart}\s+20%{DATE:dateStop}_%{TIME:timeStop}" ] 
    } 
    mutate { 
    add_field => {"tmp_start_timestamp" => "20%{dateStart}_%{timeStart}"} 
    add_field => {"tmp_stop_timestamp" => "20%{dateStop}_%{timeStop}"} 
    } 
    date { 
    match => [ "tmp_start_timestamp", "YYYY-MM-dd_HH:mm:ss" ] 
    add_tag => [ "jobStarted" ] 
    } 
    date { 
    match => [ "tmp_stop_timestamp", "YYYY-MM-dd_HH:mm:ss" ] 
    target => "stop_timestamp" 
    remove_field => ["tmp_stop_timestamp", "tmp_start_timestamp", "dateStart", "timeStart", "dateStop", "timeStop"] 
    add_tag => [ "jobStopped" ] 
    } 
    elapsed { 
    start_tag => "jobStarted" 
    end_tag => "jobStopped" 
    unique_id_field => "message" 
    } 

至於結果,我收到「@timestamp」,沒有經過時間計算與日期時間數據和兩個標籤,「stop_timestamp」領域。我錯過了什麼?

UPDATE

我試圖分裂(如@Rumbles建議)事件在兩個獨立的事件,但不知何故logstash創建兩個相同的事件:

input { 
    stdin { type => "time" } 
} 
filter { 
    grok { 
    match => [ "message", "%{WORD:componentName}\+%{WORD:branchName}\.%{DATA:jobType}\s+20%{DATE:dateStart}_%{TIME:timeStart}\s+20%{DATE:dateStop}_%{TIME:timeStop}" ] 
    } 
    mutate { 
    add_field => {"tmp_start_timestamp" => "20%{dateStart}_%{timeStart}"} 
    add_field => {"tmp_stop_timestamp" => "20%{dateStop}_%{timeStop}"} 
    update => [ "type", "start" ] 
    } 
    clone { 
    clones => ["stop"] 
    } 
if [type] == "start" { 
    date { 
    match => [ "tmp_start_timestamp", "YYYY-MM-dd_HH:mm:ss" ] 
    target => ["start_timestamp"] 
    add_tag => [ "jobStarted" ] 
    } 
} 
if [type] == "stop" { 
    date { 
    match => [ "tmp_stop_timestamp", "YYYY-MM-dd_HH:mm:ss" ] 
    target => "stop_timestamp" 
    remove_field => ["tmp_stop_timestamp", "tmp_start_timestamp", "dateStart", "timeStart", "dateStop", "timeStop"] 
    add_tag => [ "jobStopped" ] 
    } 
} 
    elapsed { 
    start_tag => "jobStarted" 
    end_tag => "jobStopped" 
    unique_id_field => "message" 
    timeout => 15 
    } 
} 

output { 
    stdout { codec => rubydebug } 
} 
+1

Logstash問題[2299](https://github.com/elastic/logstash/issues/2299)似乎是添加您要查找的功能的請求。 @Rumbles關於它如何工作是正確的(必須有兩個事件)。 –

回答

1

我從來沒有使用此過濾器但是,我剛剛閱讀了文檔,並且我認爲我理解您遇到的問題。

從您的描述我相信你正試圖運行一個事件過期的過濾器,從文檔看來,過濾器預計2個事件,一個與開始時間秒與結束時間,與一個共同點id幫助過濾器識別2個事件匹配的時間:

由此過濾器管理的事件必須具有某些特定屬性。描述任務開始的事件(「開始事件」)必須包含一個等於'start_tag'的標籤。另一方面,描述任務結束的事件(「結束事件」)必須包含一個等於'end_tag'的標籤。這兩種事件都需要擁有唯一識別該特定任務的ID字段。該字段的名稱存儲在'unique_id_field'中。

每條消息都被認爲是一個事件,因此您需要將消息拆分爲兩個事件,並且每對事件都有一個唯一的標識符以幫助過濾器將它們連接在一起。這不完全是一個整潔的解決方案(將事件分成兩個事件,然後再重新連接),可能有更好的解決方案,我不知道這一點。