나는 비동기 작업을 수행하고 관찰 가능한 시퀀스에 결과를 게시하는 ICommand
을 작성하고 있습니다. 결과는 게으르다. 누군가가 결과에 동의하지 않으면 아무 일도 일어나지 않는다. 사용자가 결과에 대한 구독을 취소하면 취소됩니다. 아래 코드는 (대단히 단순화 된) 일반적으로 작동합니다. 까다로운 점은 Execute를 호출 할 때 결과에 많은 구독자가 있더라도 비동기 작업을 한 번만 수행하기를 원합니다. 결과를 게시하기 전에 Replay.RefCount
을 수행해야한다고 생각했습니다. 그러나 이것은 효과가 없습니다. 적어도 관측 가능한 함수가 빨리 완료되면 테스트에서 작동하지 않습니다. 첫 번째 가입자는 완료 메시지를 포함하여 전체 결과를 가져오고 게시 된 결과를 폐기 한 다음 두 번째 구독자에 대해 완전히 다시 작성합니다. 이 기능을 사용하기 위해 사용한 해킹 중 하나는 실행 기능의 끝 부분에 1 tick 지연을 삽입하는 것입니다. 이것은 두 번째 가입자가 와서 결과를 얻을 수있는 충분한 시간을 제공합니다.IObservable이 한 번만 열거/실행되도록하는 방법
이 해킹이 합법적입니까? 나는 그것이 어떻게 작동하는지 또는 비 - 테스트 시나리오에서 견디지 못할지에 대해서는 잘 모른다.
결과가 한 번만 열거되도록하기위한 덜 해킹 된 방법은 무엇입니까? 내가 생각할 수있는 한 가지는 사용자가 결과에 가입하면 결과를 ReplaySubject
으로 복사하여 게시하는 것입니다. 하지만 어떻게 작동시키는 지 알 수는 없었습니다. 첫 번째 가입자는 결과를 계산하고 ReplaySubject에 채워야하지만 두 번째 구독자는 ReplaySubject를 볼 수 있어야합니다. 어쩌면 이건 일종의 정의 야. Observable.Create
.
public class AsyncCommand<T> : IObservable<IObservable<T>>
{
private readonly Func<IObservable<T>> _execute;
Subject<IObservable<T>> _results;
public AsyncCommand(Func<IObservable<T>> execute)
{
_execute = execute;
_results = new Subject<IObservable<T>>();
}
// This would be ICommand.Execute, but I've simplified here
public void Execute() => _results.OnNext(
_execute()
.Delay(TimeSpan.FromTicks(1)) // Take this line out and the test fails
.Replay()
.RefCount());
// Subscribe to the inner observable to see the results of command execution
public IDisposable Subscribe(IObserver<IObservable<T>> observer) =>
_results.Subscribe(observer);
}
[TestClass]
public class AsyncCommandTest
{
[TestMethod]
public void IfSubscribeManyTimes_OnlyExecuteOnce()
{
int executionCount = 0;
var cmd = new AsyncCommand<int>(() => Observable.Create<int>(obs =>
{
obs.OnNext(Interlocked.Increment(ref executionCount));
obs.OnCompleted();
return Disposable.Empty;
}));
cmd.Merge().Subscribe();
cmd.Merge().Subscribe();
cmd.Execute();
Assert.AreEqual(1, executionCount);
}
}
다음은 ReplaySubject를 사용하는 방법입니다. 작동하지만 결과가 느리게 게시되지 않고 구독이 손실됩니다. 결과 구독을 취소해도 작업이 취소되지는 않습니다.
public void Execute()
{
ReplaySubject<T> result = new ReplaySubject<T>();
var lostSubscription = _execute().Subscribe(result);
_results.OnNext(result);
}