0

我與室壁運動分析實驗和已經解決了與它的許多問題,但實際上堅持了以下內容:計算設備秒使用室壁運動分析

其實,我有記錄的流當設備被打開,反映

device_id | timestamp | reading 1 | 2011/09/01 22:30 | 1 1 | 2011/09/01 23:00 | 0 1 | 2011/09/02 03:30 | 1 1 | 2011/09/02 03:31 | 0

  • 有關在reading場我使用1和0關:像關閉。

我試圖做到的是創建一個重定向幾秒鐘的設備已經每5分鐘窗口到另一個流上的數字看上去就像一個泵:

device_id | timestamp | reading 1 | 2011/09/01 22:35 | 300 1 | 2011/09/01 22:40 | 300 1 | 2011/09/01 22:45 | 300 1 | 2011/09/01 22:50 | 300 1 | 2011/09/01 22:55 | 300 1 | 2011/09/01 23:00 | 300 1 | 2011/09/01 23:05 | 0 1 | 2011/09/01 23:10 | 0 ...

不知道這是Kinesis Analytics可以完成的事情,我實際上可以做一個查詢SQL表的工作,但我堅持的是流數據。

回答

1

這是可能的Drools Kinesis Analytics(亞馬遜託管服務):

類型:

package com.text; 

import java.util.Deque; 

declare EventA 
    @role(event) 
    id: int; 
    timestamp: long; 
    on: boolean; 

    //not part of the message 
    seen: boolean; 
end 

declare Session 
    id: int @key; 
    events: Deque; 
end 

declare Report 
    id: int @key; 
    timestamp: long @key; 
    onInLast5Mins: int; 
end 

規則:

package com.text; 

import java.util.Deque; 
import java.util.ArrayDeque; 

declare enum Constants 

    // 20 seconds - faster to test 
    WINDOW_SIZE(20*1000); 

    value: int; 
end 

rule "Reporter" 
    // 20 seconds - faster to test 
    timer(cron:0/20 * * ? * * *) 
when 
    $s: Session() 
then 
    long now = System.currentTimeMillis(); 

    int on = 0; //how long was on 
    int off = 0; //how long was off 
    int toPersist = 0; //last interesting event 

    for (EventA a : (Deque<EventA>)$s.getEvents()) { 
     toPersist ++; 
     boolean stop = false; 
     // time elapsed since the reading till now 
     int delta = (int)(now - a.getTimestamp()); 
     if (delta >= Constants.WINDOW_SIZE.getValue()) { 
      delta = Constants.WINDOW_SIZE.getValue(); 
      stop = true; 
     } 

     // remove time already counted 
     delta -= (on+off); 
     if (a.isOn()) 
      on += delta; 
     else 
      off += delta; 

     if (stop) 
      break; 
    } 

    int toRemove = $s.getEvents().size() - toPersist; 
    while (toRemove > 0) { 
     // this event is out of window of interest - delete 
     delete($s.getEvents().removeLast()); 
     toRemove --; 
    } 

    insertLogical(new Report($s.getId(), now, on)); 
end 

rule "SessionCreate" 
when 
    // for every new EventA 
    EventA(!seen, $id: id) from entry-point events 
    // check there is no session 
    not (exists(Session(id == $id))) 
then 
    insert(new Session($id, new ArrayDeque())); 
end 

rule "SessionJoin" 
when 
    // for every new EventA 
    $a : EventA(!seen) from entry-point events 
    // get event's session 
    $g: Session(id == $a.id) 
then 
    $g.getEvents().push($a); 
    modify($a) { 
     setSeen(true), 
     setTimestamp(System.currentTimeMillis()) 
    }; 
end 
+0

我正在檢查Drools,看起來很酷,但現在想用SQL來實現它。 – codeadict

0

爲此,您可以使用SQL與Stride HTTP API。您可以將持續SQL查詢網絡鏈接在一起,訂閱更改流,以及在發生這種情況時想要採取某種任意操作時發送實時webhook。有關詳細信息,請參閱Stride API docs