0

나는 운동성 분석 실험을하고있어 그것으로 많은 문제를 해결하지만, 실제로 붙어 한 다음계산 장치 초

을 실제로 장치가 켜져있을 때 반영 레코드 스트림을 reading 분야에서 오프에 대한에 대한 I 1 사용하고

device_id | timestamp | reading 1 | 2011/09/01 22:30 | 1 1 | 2011/09/01 23:00 | 0 1 | 2011/09/02 03:30 | 1 1 | 2011/09/02 03:31 | 0

  • 0 : 같은 끕니다. 경우

    device_id | timestamp | reading 1 | 2011/09/01 22:35 | 300 1 | 2011/09/01 22:40 | 300 1 | 2011/09/01 22:45 | 300 1 | 2011/09/01 22:50 | 300 1 | 2011/09/01 22:55 | 300 1 | 2011/09/01 23:00 | 300 1 | 2011/09/01 23:05 | 0 1 | 2011/09/01 23:10 | 0 ...

    확실하지 :

는 내가 달성 노력하고있어처럼 보이는 장치가 다른 스트림에 5 분마다 창에왔다 (초)을 리디렉션 펌프를 만드는 것입니다 이것은 Kinesis Analytics로 수행 할 수있는 것이므로 실제로 SQL 테이블을 쿼리 할 수는 있지만 스트리밍 데이터라는 사실에 집착하고 있습니다.

답변

1

Drools Kinesis Analytics (아마존 관리 서비스) 가능합니다 :

유형 :

package com.text; 

import java.util.Deque; 

declare EventA 
    @role(event) 
    id: int; 
    timestamp: long; 
    on: boolean; 

    //not part of the message 
    seen: boolean; 
end 

declare Session 
    id: int @key; 
    events: Deque; 
end 

declare Report 
    id: int @key; 
    timestamp: long @key; 
    onInLast5Mins: int; 
end 

규칙 :

package com.text; 

import java.util.Deque; 
import java.util.ArrayDeque; 

declare enum Constants 

    // 20 seconds - faster to test 
    WINDOW_SIZE(20*1000); 

    value: int; 
end 

rule "Reporter" 
    // 20 seconds - faster to test 
    timer(cron:0/20 * * ? * * *) 
when 
    $s: Session() 
then 
    long now = System.currentTimeMillis(); 

    int on = 0; //how long was on 
    int off = 0; //how long was off 
    int toPersist = 0; //last interesting event 

    for (EventA a : (Deque<EventA>)$s.getEvents()) { 
     toPersist ++; 
     boolean stop = false; 
     // time elapsed since the reading till now 
     int delta = (int)(now - a.getTimestamp()); 
     if (delta >= Constants.WINDOW_SIZE.getValue()) { 
      delta = Constants.WINDOW_SIZE.getValue(); 
      stop = true; 
     } 

     // remove time already counted 
     delta -= (on+off); 
     if (a.isOn()) 
      on += delta; 
     else 
      off += delta; 

     if (stop) 
      break; 
    } 

    int toRemove = $s.getEvents().size() - toPersist; 
    while (toRemove > 0) { 
     // this event is out of window of interest - delete 
     delete($s.getEvents().removeLast()); 
     toRemove --; 
    } 

    insertLogical(new Report($s.getId(), now, on)); 
end 

rule "SessionCreate" 
when 
    // for every new EventA 
    EventA(!seen, $id: id) from entry-point events 
    // check there is no session 
    not (exists(Session(id == $id))) 
then 
    insert(new Session($id, new ArrayDeque())); 
end 

rule "SessionJoin" 
when 
    // for every new EventA 
    $a : EventA(!seen) from entry-point events 
    // get event's session 
    $g: Session(id == $a.id) 
then 
    $g.getEvents().push($a); 
    modify($a) { 
     setSeen(true), 
     setTimestamp(System.currentTimeMillis()) 
    }; 
end 
+0

나는 Drools를 검사하고 있었지만 매우 멋지지만 SQL을 사용하여 지금 그것을 달성하고 싶다. – codeadict

0

당신은 Stride HTTP API와 함께 사용하여이 SQL을 수행 할 수 있습니다. 연속 SQL 쿼리 네트워크를 연결하고 변경 사항 스트림을 구독 할 수있을뿐만 아니라 실시간 웹 훅 (webhook)을 실행할 수 있습니다. 이러한 상황이 발생하면 임의의 조치를 취할 수 있습니다. 이에 대한 자세한 내용은 Stride API docs을 참조하십시오.