2017-11-10 5 views
0

나는 비동기 작업을 수행하고 관찰 가능한 시퀀스에 결과를 게시하는 ICommand을 작성하고 있습니다. 결과는 게으르다. 누군가가 결과에 동의하지 않으면 아무 일도 일어나지 않는다. 사용자가 결과에 대한 구독을 취소하면 취소됩니다. 아래 코드는 (대단히 단순화 된) 일반적으로 작동합니다. 까다로운 점은 Execute를 호출 할 때 결과에 ​​많은 구독자가 있더라도 비동기 작업을 한 번만 수행하기를 원합니다. 결과를 게시하기 전에 Replay.RefCount을 수행해야한다고 생각했습니다. 그러나 이것은 효과가 없습니다. 적어도 관측 가능한 함수가 빨리 완료되면 테스트에서 작동하지 않습니다. 첫 번째 가입자는 완료 메시지를 포함하여 전체 결과를 가져오고 게시 된 결과를 폐기 한 다음 두 번째 구독자에 대해 완전히 다시 작성합니다. 이 기능을 사용하기 위해 사용한 해킹 중 하나는 실행 기능의 끝 부분에 1 tick 지연을 삽입하는 것입니다. 이것은 두 번째 가입자가 와서 결과를 얻을 수있는 충분한 시간을 제공합니다.IObservable이 한 번만 열거/실행되도록하는 방법

이 해킹이 합법적입니까? 나는 그것이 어떻게 작동하는지 또는 비 - 테스트 시나리오에서 견디지 못할지에 대해서는 잘 모른다.

결과가 한 번만 열거되도록하기위한 덜 해킹 된 방법은 무엇입니까? 내가 생각할 수있는 한 가지는 사용자가 결과에 가입하면 결과를 ReplaySubject으로 복사하여 게시하는 것입니다. 하지만 어떻게 작동시키는 지 알 수는 없었습니다. 첫 번째 가입자는 결과를 계산하고 ReplaySubject에 채워야하지만 두 번째 구독자는 ReplaySubject를 볼 수 있어야합니다. 어쩌면 이건 일종의 정의 야. Observable.Create.

public class AsyncCommand<T> : IObservable<IObservable<T>> 
{ 
    private readonly Func<IObservable<T>> _execute; 
    Subject<IObservable<T>> _results; 

    public AsyncCommand(Func<IObservable<T>> execute) 
    { 
     _execute = execute; 
     _results = new Subject<IObservable<T>>(); 
    } 

    // This would be ICommand.Execute, but I've simplified here 
    public void Execute() => _results.OnNext(
     _execute() 
     .Delay(TimeSpan.FromTicks(1)) // Take this line out and the test fails 
     .Replay() 
     .RefCount()); 

    // Subscribe to the inner observable to see the results of command execution 
    public IDisposable Subscribe(IObserver<IObservable<T>> observer) => 
     _results.Subscribe(observer); 
} 

[TestClass] 
public class AsyncCommandTest 
{ 
    [TestMethod] 
    public void IfSubscribeManyTimes_OnlyExecuteOnce() 
    { 
     int executionCount = 0; 
     var cmd = new AsyncCommand<int>(() => Observable.Create<int>(obs => 
     { 
      obs.OnNext(Interlocked.Increment(ref executionCount)); 
      obs.OnCompleted(); 
      return Disposable.Empty; 
     })); 
     cmd.Merge().Subscribe(); 
     cmd.Merge().Subscribe(); 
     cmd.Execute(); 
     Assert.AreEqual(1, executionCount); 
    } 
} 

다음은 ReplaySubject를 사용하는 방법입니다. 작동하지만 결과가 느리게 게시되지 않고 구독이 손실됩니다. 결과 구독을 취소해도 작업이 취소되지는 않습니다.

public void Execute() 
{ 
    ReplaySubject<T> result = new ReplaySubject<T>(); 
    var lostSubscription = _execute().Subscribe(result); 
    _results.OnNext(result); 
} 

답변

1

이것은 작동하는 것 같습니다.

public void Execute() 
{ 
    int subscriptionCount = 0; 
    int executionCount = 0; 
    var result = new ReplaySubject<T>(); 
    var disposeLastSubscription = new Subject<Unit>(); 
    _results.OnNext(Observable.Create<T>(obs => 
    { 
     Interlocked.Increment(ref subscriptionCount); 
     if (Interlocked.Increment(ref executionCount) == 1) 
     { 
      IDisposable copySourceToReplay = Observable 
       .Defer(_execute) 
       .TakeUntil(disposeLastSubscription) 
       .Subscribe(result); 
     } 
     return new CompositeDisposable(
      result.Subscribe(obs), 
      Disposable.Create(() => 
      { 
       if (Interlocked.Decrement(ref subscriptionCount) == 0) 
       { 
        disposeLastSubscription.OnNext(Unit.Default); 
       } 
      })); 
    })); 
}