2017-11-15 11 views
0

RxJava2와 함께 영역을 사용하고 있습니다. 영역의 인스턴스가 프로그래머에 의해 닫혀 야하거나 자동으로 닫혀 야합니까?영역 - 확장 확장자 RxJava2

나는이 체인으로 문제가있다 : 그것은 사용할 수 없게,이 영역의 인스턴스가 이미 닫혔습니다 :

return Observable.merge(
     getEvents 
     .toObservable() 
     , 
     mChangeEventsNotification 
     .flatMapMaybe(notification -> 
      getEvents 
      .firstElement() 
     ) 
    ) 

영역은 java.lang.IllegalStateException가 발생합니다.

getEvents의 구현은 getList()와 같습니다.

구현 RealmObservableFactory를 보았습니다.

연산자 연산자 firstElement 닫기 영역 첫 번째 인스턴스 병합 연산자를 관찰 할 수 있습니까?

영역 인스턴스는 한 스레드 내의 모든 관찰자간에 공유됩니까?

렐름 버전 4.3.3의 버그로 보입니다. 내가 4.1.0 버전으로 다운 그레이드 할 때 everythink는 괜찮습니다. ref 카운트의 문제 일 수 있습니다.

답변

0

귀하의 질문에 대답하기 위해, 나는 수신 통합을위한 소스 코드를 요청할 것입니다 :

public Flowable<RealmResults<E>> asFlowable() { 
    if (realm instanceof Realm) { 
     return realm.configuration.getRxFactory().from((Realm) realm, this); 

RxObservableFactory가하는 기본 RealmObservableFactory,입니다 여기서 무슨 일이

@Override 
public <E> Flowable<RealmResults<E>> from(final Realm realm, final RealmResults<E> results) { 
    final RealmConfiguration realmConfig = realm.getConfiguration(); 
    return Flowable.create(new FlowableOnSubscribe<RealmResults<E>>() { 
     @Override 
     public void subscribe(final FlowableEmitter<RealmResults<E>> emitter) throws Exception { 
      // Gets instance to make sure that the Realm is open for as long as the 
      // Observable is subscribed to it. 
      final Realm observableRealm = Realm.getInstance(realmConfig); 
      resultsRefs.get().acquireReference(results); 
      final RealmChangeListener<RealmResults<E>> listener = new RealmChangeListener<RealmResults<E>>() { 
       @Override 
       public void onChange(RealmResults<E> results) { 
        if (!emitter.isCancelled()) { 
         emitter.onNext(results); 
        } 
       } 
      }; 
      results.addChangeListener(listener); 

      // Cleanup when stream is disposed 
      emitter.setDisposable(Disposables.fromRunnable(new Runnable() { 
       @Override 
       public void run() { 
        results.removeChangeListener(listener); 
        observableRealm.close(); 
        resultsRefs.get().releaseReference(results); 
       } 
      })); 

      // Emit current value immediately 
      emitter.onNext(results); 

     } 
    }, BACK_PRESSURE_STRATEGY); 
} 

하는 경우 asFlowable()을 호출 한 RealmResults의 Realm 인스턴스가 RealmConfiguration을 얻는 데 사용되며 Refable 수가 Flowable이 작성된 스레드 (UI 스레드 또는 손)에서 증가한다는 것입니다 ler 스레드).

public Flowable<List<MyObject>> getList() { 
    try(Realm realm = Realm.getDefaultInstance()) { 
     return realm.where(MyObject.class) 
        .findAllAsync() 
        .asFlowable() 
        .filter(RealmResults::isLoaded); 
    } // auto-close 
} 

을 그리고이 유동성과 관련된 영역은 당신이뿐만 아니라 유동성 구독을 취소하지 않는 한 오픈있을 것입니다 :

이 경우에도 다음과 같은 시나리오가 작동 것을 의미합니다.


질문에 대답하려면 네가 어딘가에서 RealmResults를 얻은 영역 인스턴스를 닫아야합니다.