2017-12-19 10 views
1

나는 IObservable<T> 메서드에서 반환해야 할 상황이 있습니다. 객체가 변경되면 항상 객체의 새 값 T을 반환합니다.Rx.net 및 The Actor 모델

개체가 변경된 트리거는 액터 시스템에서 가져옵니다. 예를 들어 Proto.Actor을 사용하고 있습니다.

나는 예를 입수했습니다 "작업"그러나 나는 그것이 제대로 IObservable을 구현하지 않는 등 탈퇴 적절한 허용하지 않습니다 느낄

내 솔루션 : 나는 기분

public IObservable<T> Watch<T>(long id) 
    { 
     var subscriber = _actorFactory.GetActor<ObjectChangedActor>(); 
     var request = new ObjectWatchRequest(id); 

     return Observable.Create<T>(async (observer, cancellationToken) => 
     { 
      await Task.Run(async() => 
      { 
       while (true) 
       { 
        var response = await subscriber.RequestAsync<ObjectUpdatedResponse>(request, cancellationToken); 
        // once it gets past the above line, we have gotten notice that the object has been updated 
        var updatedObj = await Get<T>(id); 
        observer.OnNext(updatedObj); 
       } 
      }, cancellationToken); 

      // will this ever be called? 
      return Disposable.Create(() => 
      { 
       Console.WriteLine("Disposal called"); 
      }); 
     }); 
    } 

, 모든 업데이트가 구독자에게 푸시된다는 점에서 "작동"한다는 사실에도 불구하고 정확하게 처리 할 수는 없으며 별도의 스레드에 있더라도 while (true)을 코드에 삽입해야한다고 생각하지 않습니다. .

배우 시스템에서 되돌아 오는 메시지 목록을 처리하고이를 사용자가 처분 할 수있는 IObservable (탈퇴)으로 바꾸는 최선의 방법은 무엇입니까?

답변

1

코드의 전체 작동 예제를 제공하지 않았기 때문에 복사하여 붙여 넣기 만 가능한 실용적인 솔루션을 제공하기가 어려웠습니다. 그러나, 나는 당신이 아마 일할 수 있다고 생각되는 잘린 버전을 만들었습니다.

귀하의 솔루션이 적절한 구현 방법이 아니라고 생각해도됩니다. 이 당신을 위해 마크 안타 알려줘

public IObservable<T> Watch<T>(long id) 
{ 
    return 
     Observable 
      .Defer(() => 
       from x in Observable.FromAsync(() => RequestAsync()) 
       from y in Observable.FromAsync(() => Get<T>(id)) 
       select y) 
      .Repeat(); 
} 

:

다음은 당신의 물건을 올려 일단 권리에 대해 작동합니다 코드의 기본 버전입니다.

+0

예, 정말로 저에게 점수를줍니다. 업데이트 된 솔루션에서 이미 .Repeat()를 사용했지만 Observable.Defer를 사용하지 않았습니다. 여전히 Observable.Create와는 다른 점은 100 %가 아니지만 좀 더 읽을 거리가 있습니다. – DenverCoder9