2017-04-16 5 views
0

저는 RxJava를 처음 사용했습니다. 플랫 맵은 방출 된 항목을 관찰 가능하게 매핑하는 것입니다. 또한, documentation에 기반하여 방출 된 관측 값은 모두 단일 관측 가능 스트림으로 결합 (병합)된다는 것을 알고 있습니다.RxJava flat map : 결과 관찰 점 중 하나가 완료되면 어떻게됩니까?

내면 관측 대상 중 일부가 완료되면 어떻게되는지 궁금합니다.

예 : 항목 데이터 키를내는 관찰 가능 항목이 있습니다. 서버에서 항목 데이터를 가져 오기 위해 다른 비동기 http 호출을해야하므로 다른 observable을 사용하여 호출합니다. 평면 맵을 사용하여이 두 가지를 연결하고 하나의 주된 관측치를 만듭니다.

"SomeMethodThatWantsItems"다음에 오는 run() 메소드는 언제 호출됩니까?

public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine) 
{ 
    Consumer<Item> onNextConsumer = 
    Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word"); 
    searchObservable 
      .subscribeOn(Schedulers.newThread()) 
      .subscribe(new Consumer<Item>(){ 
          @Override 
          public void accept(@NonNull Item item) throws Exception { 
           //Do stuff with the item 
          } 
         } 
       , new Consumer<Exception>() { //some implementation of onErrorConsumer 
        } 
       //OnComplete 
       , new Action(){ 

         @Override 
         public void run() throws Exception { 
          //When does this get called??? after the search complete or when the first http call is successful? 
         } 
        }); 

} 

private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord) 
{ 
    return Observable.create(new ObservableOnSubscribe<String>() { 
     @Override 
     public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception { 

      //Assume that our search engine call onFind everytime it finds something 
      searchEngine.addSearchListener(new searchEngineResultListener(){ 
       @Override 
       public void onFind(String foundItemKey){ 
        emitter.onNext(foundItemKey); 
       } 

       @Override 
       public void onFinishedFindingResults(){ 
        emitter.onComplete(); 
       } 
      }); 

     } 
    }); 
} 

private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key) 
{ 

    return Observable.create(new ObservableOnSubscribe<Item>() { 
     @Override 
     public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception { 

      //Call the server to get the item 
      httpCaller.call(key, new onCompleteListener(){ 
       @Override 
       public void onCompletedCall(Item result) 
       { 
        emitter.onNext(result); 
        //The result is complete! end the stream 
        emitter.onComplete(); 
       } 
      }); 
     } 
    }); 
} 

public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){ 
    //Where everything comes together 
    Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord); 
    retuern searchResultObservable 
      .observeOn(Schedulers.newThread()) 
      .flatMap(new Function<String, Observable<Item>>(){ 
       @Override 
       public Observable<Item> apply(String key){ 
        return getItemByKey(httpCaller, key); 
       } 
      }); 
} 

답변

3

onComplete()은 항상 한 번 호출 된 다음 스트림이 중지됩니다. (Observable Contract의 일부 임).
즉, 귀하의 경우 에서 SomeMethodThatWantsItems이 호출되고 이후에 모든 항목이 검색됩니다. 그것은 기본적 그래서 flatMap() 경우
각 내측 Observable 종료는 단순히 flatMap()만큼 스트림이 항목을 전송로 내 Observable에서 항목을 병합 소스 Observable 이너 Observable에서 항목을 평탄화 정지 소스 Observable 신호 것 내부 전체 Observable 스트림을 원본 스트림으로 소비하면 전체 스트림은 종료 이벤트 3 (onComplete())이 될 것이므로 내부 Observable이 하나 이상의 항목을 내보낼 수있는 경우 소스 스트림에서 둘 이상의 방출을 생성합니다.