2017-05-11 7 views
0

IObservable에서 ToEnumerable 확장자를 사용하면 사용자가 모든 요소를 ​​반복하지 않을 수 있습니다. 이 경우 Observable.CreateIDisposable이 (가) 적절하게 처리 될 수 있습니까?Reactive Extensions ToEnumerable 모든 것을 반복하지 않으면 관찰 가능한 상태를 처리하는 방법

IObservable을 사용자에게 직접 반환하는 옵션이 없다고 가정 해 봅시다.이 경우 사용자가 직접 취소를 구현할 수 있습니다.

private IObservable<Object> MakeObservable() 
{ 
    return Observable.Create(async (observer, cancelToken) => 
    { 
    using(SomeDisposable somedisposable = new SomeDisposable()) 
    { 
     while(true) 
     { 
      Object result = somedisposable.GetNextObject(); 
      if(result == null) 
      { 
      break; 
      } 
      observer.OnNext(result); 
     }  
    } 
    } 
} 

public IEnumerable<Object> GetObjects() 
{ 
    return MakeObservable().ToEnumerable(); 
} 

public void Test() 
{ 
    IEnumerable<Object> e = GetObjects(); 
    int i = 0; 
    foreach(Object o in e) 
    { 
    if(i++ == 10) 
     break; 
    } 
    //somedisposable is not disposed here!!! 
} 

답변

1

Create의 정의가 반환되지 않으므로 중지 할 수 없습니다. 관찰 가능한 소스를 타이머를 기반으로 변경하면 잘 작동합니다. 당신이이 출력을 얻을 실행하면

public IEnumerable<long> GetObjects() 
{ 
    return Observable 
     .Interval(TimeSpan.FromSeconds(1.0)) 
     .Finally(() => Console.WriteLine("Done.")) 
     .ToEnumerable(); 
} 

public void Test() 
{ 
    foreach (long i in GetObjects()) 
    { 
     Console.WriteLine(i); 
     if (i == 10) 
     { 
      break; 
     } 
    } 
} 

: 그것은 명확하게 관찰 소스에 OnCompleted를 부르고

 
0 
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
Done. 

이 코드를 사용해보십시오.

+0

감사합니다. Interval을 사용하지는 않겠지 만,이 답변은 왜 일어나는지를 이해하는 데 도움이되었습니다. foreach를 종료 할 때 취소되는 것처럼 보이는 cancelToken을 더 많이 사용할 수 있다는 것을 깨달았습니다. –