2017-10-02 4 views
3

레거시 이벤트 기반 방법을 관찰 가능 기반으로 변환하고 싶습니다. 그러나 Rx를 매우 신중하게 사용하고 있으므로 지금 당황합니다.Rx : 일정 기간 동안 첫 번째 항목 기다림

나는 이벤트 소스를 가지고 있는데, 지금은 관찰 할 수 있습니다. 특정 시점에서 나는 라인의 다음 요소를 리턴함으로써 끝나는 메소드를 시작해야한다.

이벤트 기반의 접근 방식은 다음과 같다 : 당신이 볼 수 있듯이

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor) 
{ 
    ReaderEvent result = null; 
    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken })) 
    { 
     cts.CancelAfter(waitFor); 

     EventHandler<ReaderEvent> localHandler = (o, e) => 
     { 
      if (e.PlaceId == PlaceId) 
      { 
       result = e; 
       cts.Cancel(); 
      } 
     }; 

     ReaderEventHandler += localHandler; 
     try 
     { 
      await Task.Delay(waitFor, cts.Token).ConfigureAwait(false); 
     } 
     catch (OperationCanceledException) { } 
     catch (Exception ex) 
     { 
      //... 
     } 

     ReaderEventHandler -= localHandler; 
    } 

    return result; 
} 

, 아이디어는 지연이 취소됩니다 내가 대기하고있는 이벤트 또는 토큰 소스의 도착에 의해 중 하나를 취소한다는 것입니다 그 특정 시간이 지나면 구성에 따라. 아주 깨끗합니다. 이제

의 수신 버전 : 나는 너무 제한 시간 연장과 시도

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor) 
{ 
    ReaderEvent result = null; 

    var observable = _OnReaderEvent.FirstAsync(r => r.PlaceId == PlaceId); 

    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken })) 
    { 
     cts.CancelAfter(waitFor); 
     using (observable.Subscribe(x => { 
      result = x; 
      cts.Cancel(); 
     { 
      try 
      { 
       await Task.Delay(waitFor, cts.Token).ConfigureAwait(false); 
      } 
      catch (OperationCanceledException) { } 
     } 
    } 
    return result; 
} 

너무 깨끗하지 ... 더 악화 ... . 그러나 이것이 원샷 구독이기 때문에 구독을 처리하기 전에 어떻게 든 기다려야합니다. 유일한 차이점은 OnError가 CancelAfter의 기본 제공 메커니즘이 아니라 로컬 토큰을 취소한다는 것입니다.

반죽/더 간결한 (Rx에 더 의존하는) 방법이 있습니까?

감사합니다.

답변

1

왜 그냥 코드의 간단한 수신 버전으로 이동하지 :

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor) 
{ 
    return await 
     _OnReaderEvent 
      .Where(r => r.PlaceId == PlaceId) 
      .Buffer(waitFor, 1) 
      .Select(xs => xs.FirstOrDefault()) 
      .FirstOrDefaultAsync() 
      .ToTask(); 
} 
+0

이 방법으로 종료되지 않습니다. 그러나 .ToTask() 전에 .FirstOrDefaultAsync()를 추가하면 작동합니다. 예,이 모든 것이 가장 깨끗합니다. 취소 토큰을 완벽하게 추가 할 수 있습니다. 고맙습니다! – ZorgoZ

+0

@ ZorgoZ - 취소 토큰 코드를 사용하면 기본 관찰 대상을 실제로 취소하지 않으므로 사용할 수 없습니다. 조금 무의미합니다. – Enigmativity

3

당신이 시도 할 수 :

var values = await _OnReaderEvent 
    .Where(r => r.PlaceId == placeId) 
    .Buffer(waitFor, 1) 
    .FirstAsync(); // get list of matching elements during waitFor time 

return values.FirstOrDefault(); // return first element or null if the list is empty 
+0

글쎄, 그것은이 경우 R의 모양은 IList의 이지만, 다른 방법으로 약속 한 것으로 보입니다. var values ​​= 대기 _OnReaderEvent.FirstAsync (r => r.PlaceId == PlaceId) .Buffer (waitFor); 반환 값입니다 .FirstOrDefault(); 여전히 버퍼()가 차단 된 것으로 보입니다. 라인에 이벤트가 없으면 종료되지 않습니다. ( – ZorgoZ

+0

버퍼 반환 목록 , 여기서 T는 관찰 가능한 항목 유형입니다. waitFor 시간 창의 생성 된 모든 항목의 목록이며 모든 waitFor 빈 목록을 반환해야합니다 . 버퍼 -> FirstAsync를하면 첫 번째 목록이 표시됩니다. https://msdn.microsoft.com/en-us/library/hh229813(v=vs.103).aspx –

+2

아 .. 요점이 있습니다. 그럼 내가 원하는 건 : var values ​​= 기다리고 _OnReaderEvent.Where (r => r.PlaceId == PlaceId) .Buffer (waitFor, 1) .FirstAsync(); returns values.FirstOrDefault(); 당신은 나를 오른쪽으로 지적했다. 방향! – ZorgoZ