2017-10-29 4 views
1

나는 observeable을 scan 연산자로 변환하므로 항상 현재 값과 이전 값 (또는 초기 값)을 기준으로 값을 방출합니다.RxJava의 스캔 연산자가 여러 구독자간에 상태를 유지할 수 있습니까

Subject<String> subject = PublishSubject.create(); 
Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b); 

그러면 첫 번째 관찰자가 구독하고 방출 된 모든 값을 인쇄 한 다음 일부 구독 취소 이후에 인쇄합니다.

Disposable first = observable.subscribe(System.out::println); // "zero" 
subject.onNext("one"); // "zero, one" 
first.dispose(); 

나중에 다른 가입자는 구독 : 당신이 볼 수 있듯이

Disposable second = observable.subscribe(System.out::println); // "zero" 
subject.onNext("two"); // "zero, one" 
second.dispose(); 

, 각 관찰자 먼저 이전이 사라 가입 할 때 방출되는 초기 값과 값으로 공급된다. 내가 모든 구독을 통해 scan 연산자의 내부 상태를 유지하고 달성하기 위해 싶습니다 RxJava에서이 문제에 대한 해결책이

Disposable first = observable.subscribe(System.out::println); // "zero" 
subject.onNext("one"); // "zero, one" 
first.dispose(); 
Disposable second = observable.subscribe(System.out::println); // "zero, one" 
subject.onNext("two"); // "zero, one, two" 
second.dispose(); 

있습니까?

+0

모든 구독자가 처음부터 모든 이벤트를 수신하도록 하시겠습니까? – akarnokd

+0

아니요, 구독자가 구독 할 때 방출되는 이벤트 만 받기를 원하지만 모든 이전 값을 기반으로이 이벤트의 값을 원합니다. –

+1

이 예제와 같이 'observable'이라는 Observable을 멀티 캐스트하기 만하면됩니다 : http://blog.danlew.net/2016/06/13/multicasting-in-rxjava/. 당신은 동시성이 있다면, 나는 주제를 onewext serialize하는 것이 좋습니다 것입니다. –

답변

1

또 다른 해결책

Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b); 

이있다는로 변환 할 :

observable # publish는 모든 가입자에게 결과를 멀티 캐스트하므로 마지막 #scan 값을 유지합니다. 새로운 Observable을 구독 할 때마다 새로운 Observable을 만들면 솔루션이 작동하지 않습니다. 스캔 값이 지속되지 않았습니다.

@Test 
void name3231() { 
    Subject<String> subject = PublishSubject.create(); 
    Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b); 

    ConnectableObservable<String> observableMulticast = observable.publish(); 

    Disposable connect = observableMulticast.connect(); // must be disposed by hand 

    TestObserver<String> test = observableMulticast.test(); 

    Disposable first = observableMulticast.subscribe(System.out::println); // "zero" 
    subject.onNext("one"); // "zero, one" 
    first.dispose(); 

    test.assertValues("zero, one"); 

    Disposable second = observableMulticast.subscribe(System.out::println); // "zero, one" 
    subject.onNext("two"); // "zero, one, two" 
    second.dispose(); 

    test.assertValues("zero, one", "zero, one, two"); 
} 
1

"캐싱"레이어로 BehaviorSubject을 도입하여 해결책을 찾았습니다. 문제를 해결하는 가장 깨끗한 방법인지는 확실치 않지만 작동합니다.

라인 :

Subject<String> observable = BehaviorSubject.create(); 
subject.scan("zero", (a, b) -> a + ", " + b) 
     .subscribe(observable); 
1

당신은 BehaviorSubject로 스캔 할 수 있고 다른 사람들이 그에 가입해야합니다 :

Subject<String> subject = PublishSubject.create(); 
Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b); 
Subject<String> multicast = BehaviorSubject.create(); 

observable.subscribe(multicast); 

Disposable first = multicast.subscribe(System.out::println); // "zero" 
subject.onNext("one"); // "zero, one" 
first.dispose(); 
Disposable second = multicast.subscribe(System.out::println); // "zero, one" 
subject.onNext("two"); // "zero, one, two" 
second.dispose();