나는 IObservable<T>
메서드에서 반환해야 할 상황이 있습니다. 객체가 변경되면 항상 객체의 새 값 T
을 반환합니다.Rx.net 및 The Actor 모델
개체가 변경된 트리거는 액터 시스템에서 가져옵니다. 예를 들어 Proto.Actor을 사용하고 있습니다.
나는 예를 입수했습니다 "작업"그러나 나는 그것이 제대로 IObservable
을 구현하지 않는 등 탈퇴 적절한 허용하지 않습니다 느낄
내 솔루션 : 나는 기분
public IObservable<T> Watch<T>(long id)
{
var subscriber = _actorFactory.GetActor<ObjectChangedActor>();
var request = new ObjectWatchRequest(id);
return Observable.Create<T>(async (observer, cancellationToken) =>
{
await Task.Run(async() =>
{
while (true)
{
var response = await subscriber.RequestAsync<ObjectUpdatedResponse>(request, cancellationToken);
// once it gets past the above line, we have gotten notice that the object has been updated
var updatedObj = await Get<T>(id);
observer.OnNext(updatedObj);
}
}, cancellationToken);
// will this ever be called?
return Disposable.Create(() =>
{
Console.WriteLine("Disposal called");
});
});
}
, 모든 업데이트가 구독자에게 푸시된다는 점에서 "작동"한다는 사실에도 불구하고 정확하게 처리 할 수는 없으며 별도의 스레드에 있더라도 while (true)
을 코드에 삽입해야한다고 생각하지 않습니다. .
배우 시스템에서 되돌아 오는 메시지 목록을 처리하고이를 사용자가 처분 할 수있는 IObservable
(탈퇴)으로 바꾸는 최선의 방법은 무엇입니까?
예, 정말로 저에게 점수를줍니다. 업데이트 된 솔루션에서 이미 .Repeat()를 사용했지만 Observable.Defer를 사용하지 않았습니다. 여전히 Observable.Create와는 다른 점은 100 %가 아니지만 좀 더 읽을 거리가 있습니다. – DenverCoder9