2012-02-01 4 views
2

반응 확장을 사용하면 동일한 관찰 가능 항목에 2 회 구독하기 쉽습니다. observable에서 새 값을 사용할 수 있으면 두 구독자 모두이 동일한 값으로 호출됩니다.rx reactive extension : 각 가입자가 관측 대상에서 다른 값 (다음 값)을 얻는 방법?

각 관찰자가이 관찰 대상에서 다른 값 (다음 값)을 얻게 할 방법이 있습니까?
소스 서열 : 1,2,3,4,5, ...] (무한)
소스 끊임없이 미지 속도 새로운 항목을 추가하고 난 후 어떤

예.
N 가입자를 사용하는 각 항목에 대해 길이가 긴 비동기 작업을 실행하려고합니다.

1 가입자 : 1,2,4, ...
2 가입자 : 3,5, ...
...
또는
1 가입자 : 1,3, ...
2 가입자 : 2,4,5, ...
...
또는
1 가입자 : 1,3,5, ...
2 가입자 : 2,4,6, ...

+0

* 시나리오 * (달성하고자하는 상위 목표)를 설명해 주시겠습니까? –

+0

예. Sql 이벤트를 사용하여 테이블의 일부 새 항목을 가져 와서 작업이 비동기 및 다중 스레드 (예 : 각 작업이 오랜 시간 작업을 수행 할 수 있지만 어느 정도 실행될 수 있음)가되어야하는 작업 파이프 라인을 통해 전달합니다. – Softlion

+0

해결책이 있습니까? – Roar

답변

1

나는 Asti와 동의 할 것입니다.

Rx를 사용하여 대기열 (채우기 차단)을 채우고 경쟁 소비자가 대기열에서 읽도록 할 수 있습니다. 이런 식으로 한 프로세스가 어떤 이유로 더 빠르면 다른 소비자가 아직 바쁘다면 다음 제품을 잠재적으로 가져올 수 있습니다.

그러나, 좋은 조언을 원할 경우 :), 각 요소의 색인을 제공하는 Select 연산자를 사용할 수 있습니다. 그런 다음이를 구독자에게 전달할 수 있으며 모듈로 적합 할 수 있습니다. (윽! 잠재적 소스 시퀀스 등을 potentiall 부작용을 차단 새는 추상화, 매직 넘버)

var source = Obserservable.Interval(1.Seconds()) 
    .Select((i,element)=>{new Index=i, Element=element}); 

var subscription1 = source.Where(x=>x.Index%2==0).Subscribe(x=>DoWithThing1(x.Element)); 
var subscription2 = source.Where(x=>x.Index%2==1).Subscribe(x=>DoWithThing2(x.Element)); 

또한 기억이 아직도 그것이 스케줄러를 차단합니다 차단되는 경우 OnNext 핸들러에 수행 된 작업 에. 이것은 소스/프로듀서의 속도에 영향을 줄 수 있습니다. Asti의 대답이 더 나은 선택 인 또 다른 이유.

IObservable<TRet> SomeLengthyOperation(T input) 
{ 
    return Observable.Defer(() => Observable.Start(() => { 
     return someCalculatedValueThatTookALongTime; 
    }, Scheduler.TaskPoolScheduler)); 
} 

someObservableSource 
    .SelectMany(x => SomeLengthyOperation(input)) 
    .Subscribe(x => Console.WriteLine("The result was {0}", x); 

당신도 동시 작업의 수를 제한 할 수 있습니다 :

someObservableSource 
    .Select(x => SomeLengthyOperation(input)) 
    .Merge(4 /* at a time */) 
    .Subscribe(x => Console.WriteLine("The result was {0}", x); 

그것은 병합에 대한 중요합니다 (즉, 분명 :-)

+0

당신의 솔루션은 Polling을 사용하는데, 나는 절대 피하고 싶습니다. 그러나 그 생각이 있습니다. – Softlion

1

어떻하지 않은 경우

는 질문 4) 작동하려면 SomeLengthyOperation에 의해 반환 된 Observable이 Cold Observable이되어야합니다. Defer가 Observable에서 관찰하는 것입니다. Observ가 관찰자가됩니다 누군가가 구독 할 때까지 시작되지 않습니다.

+0

Merge (int)가 존재하지 않습니다? – Softlion

+0

일부 ObservableSource는 IVEservable이어야합니다. 및 SomeLengthyOperation은 병합 (int)이 일치하도록 IObservable 을 반환해야합니다. –