2017-01-27 5 views
4

이미 내가 비슷한 질문이 반응성 (Database polling with Reactive Extensions)Reactive Extensions를 사용하여 주를 어떻게 폴링합니까?

를 사용하여 데이터베이스 폴링에 좋은 질문이 있습니다,하지만 트위스트와 함께 : 나는 다음 요청으로 이전 결과에서 값을 공급해야합니다. 기본적으로,이 폴링 싶습니다 :

interface ResultSet<T> 
{ 
    int? CurrentAsOfHandle {get;} 
    IList<T> Results {get;} 
} 

Task<ResultSet<T>> GetNewResultsAsync<T>(int? previousRequestHandle); 

을 생각이 이전 요청


  1. 매 순간 내가 GetNewResultsAsync
  2. 내가 전화하고 싶은 이후의 모든 새 항목을 반환한다는 것입니다 이전 호출에서 CurrentAsOf을 인수로 사용하여 previousRequest 매개 변수
  3. 에 다음 호출을으로 전달하려고합니다. 이 버전은 기다리는 동안 messageResultSet 수집 할 수 있는지

    return Observable.Create<IMessage>(async (observer, cancellationToken) => 
    { 
        int? currentVersion = null; 
    
        while (!cancellationToken.IsCancellationRequested) 
        { 
         MessageResultSet messageResultSet = await ReadLatestMessagesAsync(currentVersion); 
    
         currentVersion = messageResultSet.CurrentVersionHandle; 
    
         foreach (IMessage resultMessage in messageResultSet.Messages) 
          observer.OnNext(resultMessage); 
    
         await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); 
        } 
    }); 
    

    이 또한주의 :실제로 기본적으로 이전


후 1 분 발생해야,보다 더 나은 방법이있다 다음 반복 (예를 들어, Scan을 사용하여 이전 결과 집합 개체를 다음 반복에 전달할 수 있다고 생각한 경우)

답변

1

이것에 CES는 : 서명과 Scan 기능이있다 :

IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator) 

하지만 당신은 누적 기 함수는 관측을 반환

IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator) 

... 뭔가를해야하고, 스캔 기능이 자동으로 감소 다음 전화에 전달하십시오. 이제 우리는이 멋진 MyObservableScan 연산자를 가지고

public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator) 
{ 
    return source 
     .Publish(_source => _source 
      .Take(1) 
      .Select(s => accumulator(initialValue, s)) 
      .SelectMany(async o => (await o.LastOrDefaultAsync()) 
       .Let(m => _source 
        .MyObservableScan(m, accumulator) 
        .StartWith(m) 
       ) 
      ) 
      .Merge() 
     ); 
} 

//Wrapper to accommodate easy Task -> Observable transformations 
public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator) 
{ 
    return source.MyObservableScan(initialValue, (a, s) => Observable.FromAsync(() => accumulator(a, s))); 
} 

//Function to prevent re-evaluation in functional scenarios 
public static U Let<T, U>(this T t, Func<T, U> selector) 
{ 
    return selector(t); 
} 

:

public static IObservable<TAccumulate> MyScan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator) 
{ 
    return source 
     .Publish(_source => _source 
      .Take(1) 
      .Select(s => accumulator(initialValue, s)) 
      .SelectMany(m => _source.MyScan(m, accumulator).StartWith(m)) 
     ); 
} 

우리가 그것을 환원 기능을 통합하는 비트를 변경할 수 있음을 감안할 때 : 여기

Scan의 가난한 사람의 기능 구현의 비교적 쉽게 문제를 해결할 수 있습니다.

var o = Observable.Interval(TimeSpan.FromMinutes(1)) 
    .MyObservableScan<long, ResultSet<string>>(null, (r, _) => Methods.GetNewResultsAsync<string>(r?.CurrentAsOfHandle)) 

테스트에서 누적 기 Task/Observable 함수가 소스의 간격보다 오래 걸리면 관찰 가능 객체가 종료된다는 사실에 유의하십시오. 이유가 확실하지 않습니다. 누군가가 고칠 수 있다면, 많은 의무가 있습니다.

0

나는 그 후로 트릭을 수행하는 Observable.Generate에 과부하가 있음을 발견했습니다. 가장 큰 단점은 async에서 작동하지 않는다는 것입니다.

public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler);

나는 나의 초기 상태로 null 전달합니다. 내 조건으로 x => true을 전달하십시오 (끝없이 여론 조사를 위해). iterate 안에는 전달 된 상태를 기반으로 실제 폴링을 수행합니다. 그런 다음 timeSelector에 폴링 간격을 반환합니다.

그래서 :

var resultSets = Observable.Generate<ResultSet<IMessage>, IEnumerable<IMessage>>(
    //initial (empty) result 
    new ResultSet<IMessage>(), 

    //poll endlessly (until unsubscription) 
    rs => true, 

    //each polling iteration 
    rs => 
    { 
     //get the version from the previous result (which could be that initial result) 
     int? previousVersion = rs.CurrentVersionHandle; 

     //here's the problem, though: it won't work with async methods :(
     MessageResultSet messageResultSet = ReadLatestMessagesAsync(currentVersion).Result; 

     return messageResultSet; 
    }, 

    //we only care about spitting out the messages in a result set 
    rs => rs.Messages, 

    //polling interval 
    TimeSpan.FromMinutes(1), 

    //which scheduler to run each iteration 
    TaskPoolScheduler.Default); 

return resultSets 
    //project the list of messages into a sequence 
    .SelectMany(messageList => messageList); 
+0

또 다른 사소한 단점은 전체 결과 집합은 다음 반복을 통해 살아있다. 문제의 버전은 '버전'만 필요하기 때문에 '메시지'부분을 가비지 수집 할 수 있습니다. –