2015-01-11 2 views
0

비교적 주기적으로 정적 소스를 폴링하여 Microsoft StreamInsight에서 참조 스트림을 만드는 방법은 무엇입니까?느린 움직이는 데이터를 데이터베이스에서 폴링하여 StreamInsight 참조 스트림으로 사용하는 방법?

다음은 내가 시도한 것입니다. 나는 간단한 List<UserMetaData> 여기

var referenceData = new List<UserMetaData>() 
    { 
     new UserMetaData() { UserId = 1, Name = "Fred Jones", Location = "Seattle" }, 
     new UserMetaData() { UserId = 2, Name = "Bob Murphy", Location = "Portland" } 
    }; 

으로 사용자 메타 데이터의 데이터베이스를 표현하고있어하면 UserMetaData 클래스

public class UserMetaData 
{ 
    public int UserId { get; set; } 
    public string Name { get; set; } 
    public string Location { get; set; } 

    public override string ToString() 
    { 
     return string.Format(
      "Name: {0}, ID: {1}, Location: {2}", 
      this.Name, 
      this.UserId, 
      this.Location); 
    } 
} 

나머지 예제 코드의 나머지 부분은 표준 StreamInsight를 내장 배포 설정에서 줄임표를 대체에게 있습니다. 나는 5 분 대신 이초이 하트 비트 간격 만들 수있는 실제 응용 프로그램에서

var heartbeat = app.DefineObservable(
         () => Observable.Interval(TimeSpan.FromSeconds(2))); 

: 먼저

using (var server = Server.Create("default")) 
{ 
    var app = server.CreateApplication("app"); 
    // ... 
} 

, 나는 이런 하트 비트를 만들 수 있습니다. 어쨌든, 다음 나는 heatbeat 새로운 사용자 메타 데이터에 대한 데이터베이스 조회를 트리거 할 :

var newUserMeta = app.DefineObservable(
         () => heartbeat.SelectMany(_ => referenceData)) 
        .ToPointStreamable(
         c => PointEvent.CreateInsert(DateTime.Now, c), 
         AdvanceTimeSettings.IncreasingStartTime); 

IQbservable.SelectMany 확장 내가 referenceData에서 기대하는 IEnumerable<UserMetaData>을 평평하게한다. _ 매개 변수는 하트 비트에서 방출되는 long을 버립니다. 그런 다음 ToPointStreamableIObservable<UserMetaData>을 시작 시간이 지금 인 포인트 이벤트로 IQStreamable으로 변환합니다. (DateTime.Now는 아마도 StreamInsight-y가 아닐 것입니다.)

그런 다음 신호로 변환하고 간단한 쿼리를 통해 콘솔 싱크를 정의하고 배포합니다.

// Convert to signal 
var metaDataSignal = refStream 
        .AlterEventDuration(e => TimeSpan.MaxValue) 
        .ClipEventDuration(refStream, (e1, e2) => e1.Name == e2.Name); 

// Query 
var result = from t in metaDataSignal 
       select t; 

// Define & deploy sink. 
var sink = app.DefineObserver(
        () => Observer.Create<UserMetaData>(c => Console.WriteLine(c))); 
sink.Deploy("sink"); 

내 마지막 단계는 싱크대 Bind입니다. 또한 다음 화면으로 인쇄 내 메타 데이터 폴링 하트 비트의 출력을보고 내 데이터베이스에 새 UserMetaData 레코드를 추가하고 변경 사항이 반영되어 있는지 확인하기 위해 기다리는 몇 초 기다립니다.

using (var process = result.Bind(sink).Run("process")) 
{ 
    Thread.Sleep(4000); 

    referenceData.Add(new UserMetaData() 
          { 
           UserId = 3, 
           Name = "Voqk", 
           Location = "Houston" 
          }); 

    Console.ReadLine(); 
} 

새로운 UserMetaData 레코드 출력에 반영되지 않습니다

Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 
Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 
Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 
Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 
Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 
Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 
Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 

(... forever) 

내가 무슨 일이 일어나고 가정 나의 UserMetaData 목록 직렬화 및 SI 서버에 만든 그래서 모든 변경 사항을 다시 생성되는 것을 내 로컬 사본은 반영되지 않습니다. 나는 이것을 어떻게 지나야하는지 잘 모릅니다.

마크 심스 (Mark Simms)는 2010 년에 정적 데이터 소스를 사용하는 방법을 설명한 blog posts about using reference streams in StreamInsight을 작성했으며 다음 게시물은 SQL Server 사용에 대해 설명합니다.

불행히도 그 소식은 결코 일어나지 않았습니다.

편집 : 나는 마크 심스 '게시물에 그 마하이 게시물의 클래스를 변경하고-혼란을 해제하고 내 프로세스를 자세히 설명하려고했습니다.

답변

0

귀하의 가정이 올바른 것입니다. .NET 클래스는 StreamInsight 엔진에 포함되지 않습니다. 클래스는 스키마 만 (페이로드의 모양)에 사용됩니다. 그래서 ... 참조 데이터를 어떻게 변경합니까? 첫째, 소스를 주기적으로 새로 고쳐야합니다.이 기간은 데이터 변경 빈도에 따라 다릅니다. 그런 다음 참조 스트림의 경우 데이터에 관계없이 CTI를 큐에 넣기 위해 타이머를 사용하거나 (데이터 이동에 상관없이) CTI를 데이터 스트림에서 가져 오는 방법이 필요합니다. 첫 번째 방법은 가장 쉽지만 두 번째 방법은 참조 스트림을 데이터 스트림에서 사용하는 타임 스탬프에 연결하고 실시간 시나리오가 아닌 재생 시나리오에서 작동하므로 더 유연합니다. 마지막으로, 참조 이벤트가 만료되도록하고 새로운 참조 데이터가 추가되면 대체해야합니다. 이것은 "To Signal"패턴 (변경/클립)을 사용하여 수행됩니다. 여기에도 옵션이 있습니다. 참조 소스가 변경 사항을 큐에 넣을만큼 "스마트"한 경우 참조 이벤트의 수명을 TimeSpan.MaxValue로 변경 한 다음 참조 데이터는 취소 될 때까지 유효합니다. 그러나 모든 참조 이벤트를 다시로드하려는 경우 새로 고침 빈도보다 약간 긴 이벤트 기간을 변경 한 다음 클립 할 수 있습니다. 이 메소드는 스트림에서 참조 이벤트를 제거 할 수도있다. (삭제 등의 경우) 참조 데이터의 마지막 문제는 타임 스탬프를 처리하는 방법이다. 대부분의 샘플 시나리오에서 데이터 타임 라인은 시스템 클록을 기반으로합니다 ... 항상 그런 것은 아닙니다. 이러한 시나리오에서도 데이터 이벤트가 이미 진행되는 동안 대기중인 참조 이벤트의 경쟁 조건으로 인해 시작할 때 일부 조인을 "놓칠"수 있습니다. 이 경우, 참조 데이터 및 터무니없이 늦은 종료 날짜 (2100 년 1 월 1 일)에 대해 터무니없이 조기 시작 날짜 (1970 년 1 월 1 일)를 사용하고 간격으로 대기열에 넣는 것이 좋습니다. 그러나이 경우 데이터 스트림에서 CTI를 가져와야하며 가져온 CTI 및 기타 동기화 작업을 직접 위반하지 않도록 참조 이벤트의 시작 날짜를 수정해야합니다. 어댑터/쿼리 모델은 매우 능숙하게 처리했지만 Reactive 모델은 그렇지 않습니다 ... 그러나 Reactive 모델을 사용하면 주제를 사용하여이 모든 작동 방식을 실제로 미세하게 조정할 수 있으므로 상당히 유연합니다.

+0

"먼저, 소스는 주기적으로 갱신해야합니다." 배포 된 소스를 새로 고치는 방법을 설명 할 수 있습니까? 동일한 이름의 새 소스를 배포하려고 시도한 후 추가 정보 "개체 'cep :/Server/Application/referenceTest/Entity/refStream'이 (가) 이미 존재하는 _Microsoft.ComplexEventProcessing.InvalidDefinitionException_을 받았습니다. – voqk

0

테스트에서 나는 var referenceData = List<UserMetaData>()...을 메인에서 벗어나 로컬 변수 대신 정적 멤버로 선언했습니다.

class Program 
{ 
    // *NOW STATIC* 
    private static List<UserMetaData> referenceData = new List<UserMetaData>() 
    { 
     new UserMetaData() {UserId = 1, Name = "Fred Jones", Location = "Seattle"}, 
     new UserMetaData() {UserId = 2, Name = "Bob Murphy", Location = "Portland"} 
    }; 

    public static void Main(string[] args) 
    { 
     // ... 

이제 출력에 반영됩니다 데이터베이스에 변경이 ...

Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 
Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 
Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 
Name: Voqk, ID: 3, Location: Houston 
Name: Fred Jones, ID: 1, Location: Seattle 
Name: Bob Murphy, ID: 2, Location: Portland 
Name: Voqk, ID: 3, Location: Houston