저는 RxJava를 처음 사용했습니다. 플랫 맵은 방출 된 항목을 관찰 가능하게 매핑하는 것입니다. 또한, documentation에 기반하여 방출 된 관측 값은 모두 단일 관측 가능 스트림으로 결합 (병합)된다는 것을 알고 있습니다.RxJava flat map : 결과 관찰 점 중 하나가 완료되면 어떻게됩니까?
내면 관측 대상 중 일부가 완료되면 어떻게되는지 궁금합니다.
예 : 항목 데이터 키를내는 관찰 가능 항목이 있습니다. 서버에서 항목 데이터를 가져 오기 위해 다른 비동기 http 호출을해야하므로 다른 observable을 사용하여 호출합니다. 평면 맵을 사용하여이 두 가지를 연결하고 하나의 주된 관측치를 만듭니다.
"SomeMethodThatWantsItems"다음에 오는 run() 메소드는 언제 호출됩니까?
public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine)
{
Consumer<Item> onNextConsumer =
Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word");
searchObservable
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<Item>(){
@Override
public void accept(@NonNull Item item) throws Exception {
//Do stuff with the item
}
}
, new Consumer<Exception>() { //some implementation of onErrorConsumer
}
//OnComplete
, new Action(){
@Override
public void run() throws Exception {
//When does this get called??? after the search complete or when the first http call is successful?
}
});
}
private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord)
{
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception {
//Assume that our search engine call onFind everytime it finds something
searchEngine.addSearchListener(new searchEngineResultListener(){
@Override
public void onFind(String foundItemKey){
emitter.onNext(foundItemKey);
}
@Override
public void onFinishedFindingResults(){
emitter.onComplete();
}
});
}
});
}
private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key)
{
return Observable.create(new ObservableOnSubscribe<Item>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception {
//Call the server to get the item
httpCaller.call(key, new onCompleteListener(){
@Override
public void onCompletedCall(Item result)
{
emitter.onNext(result);
//The result is complete! end the stream
emitter.onComplete();
}
});
}
});
}
public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){
//Where everything comes together
Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord);
retuern searchResultObservable
.observeOn(Schedulers.newThread())
.flatMap(new Function<String, Observable<Item>>(){
@Override
public Observable<Item> apply(String key){
return getItemByKey(httpCaller, key);
}
});
}