비교적 주기적으로 정적 소스를 폴링하여 Microsoft StreamInsight에서 참조 스트림을 만드는 방법은 무엇입니까?느린 움직이는 데이터를 데이터베이스에서 폴링하여 StreamInsight 참조 스트림으로 사용하는 방법?
다음은 내가 시도한 것입니다. 나는 간단한 List<UserMetaData>
여기
var referenceData = new List<UserMetaData>()
{
new UserMetaData() { UserId = 1, Name = "Fred Jones", Location = "Seattle" },
new UserMetaData() { UserId = 2, Name = "Bob Murphy", Location = "Portland" }
};
으로 사용자 메타 데이터의 데이터베이스를 표현하고있어하면 UserMetaData 클래스
public class UserMetaData
{
public int UserId { get; set; }
public string Name { get; set; }
public string Location { get; set; }
public override string ToString()
{
return string.Format(
"Name: {0}, ID: {1}, Location: {2}",
this.Name,
this.UserId,
this.Location);
}
}
나머지 예제 코드의 나머지 부분은 표준 StreamInsight를 내장 배포 설정에서 줄임표를 대체에게 있습니다. 나는 5 분 대신 이초이 하트 비트 간격 만들 수있는 실제 응용 프로그램에서
var heartbeat = app.DefineObservable(
() => Observable.Interval(TimeSpan.FromSeconds(2)));
: 먼저
using (var server = Server.Create("default"))
{
var app = server.CreateApplication("app");
// ...
}
, 나는 이런 하트 비트를 만들 수 있습니다. 어쨌든, 다음 나는 heatbeat 새로운 사용자 메타 데이터에 대한 데이터베이스 조회를 트리거 할 :
var newUserMeta = app.DefineObservable(
() => heartbeat.SelectMany(_ => referenceData))
.ToPointStreamable(
c => PointEvent.CreateInsert(DateTime.Now, c),
AdvanceTimeSettings.IncreasingStartTime);
IQbservable.SelectMany 확장 내가 referenceData에서 기대하는 IEnumerable<UserMetaData>
을 평평하게한다. _
매개 변수는 하트 비트에서 방출되는 long을 버립니다. 그런 다음 ToPointStreamable
은 IObservable<UserMetaData>
을 시작 시간이 지금 인 포인트 이벤트로 IQStreamable
으로 변환합니다. (DateTime.Now는 아마도 StreamInsight-y가 아닐 것입니다.)
그런 다음 신호로 변환하고 간단한 쿼리를 통해 콘솔 싱크를 정의하고 배포합니다.
// Convert to signal
var metaDataSignal = refStream
.AlterEventDuration(e => TimeSpan.MaxValue)
.ClipEventDuration(refStream, (e1, e2) => e1.Name == e2.Name);
// Query
var result = from t in metaDataSignal
select t;
// Define & deploy sink.
var sink = app.DefineObserver(
() => Observer.Create<UserMetaData>(c => Console.WriteLine(c)));
sink.Deploy("sink");
내 마지막 단계는 싱크대 Bind
입니다. 또한 다음 화면으로 인쇄 내 메타 데이터 폴링 하트 비트의 출력을보고 내 데이터베이스에 새 UserMetaData
레코드를 추가하고 변경 사항이 반영되어 있는지 확인하기 위해 기다리는 몇 초 기다립니다.
using (var process = result.Bind(sink).Run("process"))
{
Thread.Sleep(4000);
referenceData.Add(new UserMetaData()
{
UserId = 3,
Name = "Voqk",
Location = "Houston"
});
Console.ReadLine();
}
새로운 UserMetaData 레코드 출력에 반영되지 않습니다
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
Name: Fred Jones, ID: 1, Location: Seattle
Name: Bob Murphy, ID: 2, Location: Portland
(... forever)
내가 무슨 일이 일어나고 가정 나의 UserMetaData 목록 직렬화 및 SI 서버에 만든 그래서 모든 변경 사항을 다시 생성되는 것을 내 로컬 사본은 반영되지 않습니다. 나는 이것을 어떻게 지나야하는지 잘 모릅니다.
마크 심스 (Mark Simms)는 2010 년에 정적 데이터 소스를 사용하는 방법을 설명한 blog posts about using reference streams in StreamInsight을 작성했으며 다음 게시물은 SQL Server 사용에 대해 설명합니다.
불행히도 그 소식은 결코 일어나지 않았습니다.
편집 : 나는 마크 심스 '게시물에 그 마하이 게시물의 클래스를 변경하고-혼란을 해제하고 내 프로세스를 자세히 설명하려고했습니다.
"먼저, 소스는 주기적으로 갱신해야합니다." 배포 된 소스를 새로 고치는 방법을 설명 할 수 있습니까? 동일한 이름의 새 소스를 배포하려고 시도한 후 추가 정보 "개체 'cep :/Server/Application/referenceTest/Entity/refStream'이 (가) 이미 존재하는 _Microsoft.ComplexEventProcessing.InvalidDefinitionException_을 받았습니다. – voqk